반응형

Reactor의 Sinks는 리액티브 스트림즈에서 프로그래밍 방식으로 Signal을 전송하는 표준 메커니즘입니다. 기존 Processor의 한계를 극복하고, 멀티스레드 환경에서 안정적인 데이터 방출을 보장합니다.
(Reactor 3.4.0+부터 도입, Processor는 3.5.0+에서 deprecated)
1. Sinks의 주요 특징
- Signal 수동 방출:
onNext,onComplete,onError를 코드로 직접 제어 가능 - 스레드 안전성(Thread Safety): 동시 접근 시 경쟁 조건 방지 (예:
FAIL_FAST전략) - 다중 구독자 지원:
unicast,multicast,replay등 다양한 전략 제공 - 백프레셔 관리: 구독자의 요청에 따른 자동 흐름 제어
2. Sinks 종류 및 사용법
2.1. Sinks.One
단일 데이터 방출에 최적화된 싱크. Mono로 변환 가능.
Sinks.One sinkOne = Sinks.one();
Mono mono = sinkOne.asMono();
// 데이터 방출 (최초 1회만 성공)
sinkOne.emitValue("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
sinkOne.emitValue("Hi", Sinks.EmitFailureHandler.FAIL_FAST); // 무시됨
// 구독
mono.subscribe(data -> System.out.println("Subscriber1: " + data));
mono.subscribe(data -> System.out.println("Subscriber2: " + data));
2.2. Sinks.Many
다중 데이터 방출 지원. Flux로 변환 가능. 세 가지 전략 존재.
(1) Unicast
- 단일 구독자만 허용
- 두 번째 구독자 접근 시
IllegalStateException발생
Sinks.Many unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux flux = unicastSink.asFlux();
unicastSink.emitNext("Data1", FAIL_FAST);
unicastSink.emitNext("Data2", FAIL_FAST);
flux.subscribe(data -> System.out.println("Subscriber: " + data));
(2) Multicast
- Hot Publisher: 구독 이후 방출된 데이터만 수신
- 다중 구독자 지원 (기본: 모든 구독자 취소 시 버퍼 클리어)
Sinks.Many multicastSink = Sinks.many().multicast().directBestEffort();
Flux flux = multicastSink.asFlux();
multicastSink.emitNext("A", FAIL_FAST);
flux.subscribe(data -> System.out.println("Sub1: " + data)); // "A" 수신 X
multicastSink.emitNext("B", FAIL_FAST); // "B" 수신
(3) Replay
- Cold Publisher: 구독 전 데이터도 재전송
all(): 전체 기록,limit(int): 최근 N개 기록
Sinks.Many replaySink = Sinks.many().replay().limit(2);
Flux flux = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
flux.subscribe(data -> System.out.println("Sub1: " + data)); // 2, 3 출력
3. 주요 설정 옵션
3.1. EmitFailureHandler
FAIL_FAST: 실패 시 즉시 예외 발생 (권장)FAIL_FAST_CANCEL: 실패 시 구독 취소RETRY_WHEN: 재시도 정책 설정
3.2. Backpressure 전략
onBackpressureBuffer(): 버퍼 오버플로우 시 에러onBackpressureError(): 버퍼 가득 찰 시 즉시 에러onBackpressureDrop(): 버퍼 가득 찰 시 최신 데이터 드롭
4. 실전 활용 예시: 실시간 알림 시스템
@Service
public class NotificationService {
private final Sinks.Many sink = Sinks.many().multicast().directBestEffort();
public Flux getNotifications() {
return sink.asFlux();
}
public void sendNotification(String message) {
sink.emitNext(message, FAIL_FAST);
}
}
// 컨트롤러
@GetMapping("/notifications")
public Flux streamNotifications() {
return notificationService.getNotifications();
}
5. 결론: Sinks 사용 시 고려 사항
- 유스케이스에 맞는 전략 선택
- 단일 구독자 →
unicast - 실시간 브로드캐스팅 →
multicast - 과거 데이터 재전송 필요 →
replay
- 단일 구독자 →
- 스레드 안전성 보장
emit*메서드 외부에서 동기화 필수 (예:synchronized블록)
- 리소스 관리
- 구독 종료 시
sink.tryEmitComplete()로 명시적 종료 권장
- 구독 종료 시
- 모니터링
Sinks.EmitResult로 방출 결과 로깅 (성공/실패 추적)
Sinks는 리액티브 시스템에서 유연한 데이터 방출과 효율적인 구독 관리를 가능하게 하는 핵심 도구입니다. 전략적 선택과 올바른 사용법을 통해 안정적인 비동기 애플리케이션을 구축하세요!
[WebFlux] 리액티브 스트림즈 Sinks
Sinks란? Sinks는 리액티브 스트림즈에서 Signal을 전송하는 데 사용되는 구성 요소입니다. 이전 버전의...
blog.naver.com
[WebFlux] Backpressure란? – 리액티브 데이터 흐름 제어의 핵심
Backpressure(배압)는 리액티브 프로그래밍에서 데이터 생산자(Publisher)와 소비자(Subscriber) 간의 흐름 제어 메커니즘입니다. 시스템이 데이터 과부하를 방지하고 안정성을 보장하기 위해 반드시 이
hoosfa.tistory.com
반응형
'IT기술 > webflux (reactor)' 카테고리의 다른 글
| Java Optional 완벽 가이드: null 안전성을 위한 차세대 프로그래밍 패러다임 (0) | 2025.07.04 |
|---|---|
| [WebFlux] Scheduler의 역할과 활용 방법 (0) | 2025.04.29 |
| [WebFlux] Backpressure란? – 리액티브 데이터 흐름 제어의 핵심 (6) | 2025.04.27 |
| WebFlux Cold Sequence와 Hot Sequence: 데이터 흐름의 두 가지 방식 (0) | 2025.04.21 |
| [WebFlux] 마블 다이어그램(Marble Diagram): 리액티브 프로그래밍의 시각적 도구 (2) | 2025.04.12 |