IT기술/webflux (reactor)

[WebFlux] 리액티브 스트림즈 Sinks: 핵심 개념과 활용 가이드

후스파 2025. 4. 28. 15:18
반응형

spring webflux reactor

Reactor의 Sinks는 리액티브 스트림즈에서 프로그래밍 방식으로 Signal을 전송하는 표준 메커니즘입니다. 기존 Processor의 한계를 극복하고, 멀티스레드 환경에서 안정적인 데이터 방출을 보장합니다.
(Reactor 3.4.0+부터 도입, Processor는 3.5.0+에서 deprecated)

 


 

1. Sinks의 주요 특징

  • Signal 수동 방출: onNext, onComplete, onError를 코드로 직접 제어 가능
  • 스레드 안전성(Thread Safety): 동시 접근 시 경쟁 조건 방지 (예: FAIL_FAST 전략)
  • 다중 구독자 지원: unicast, multicast, replay 등 다양한 전략 제공
  • 백프레셔 관리: 구독자의 요청에 따른 자동 흐름 제어

 


 

2. Sinks 종류 및 사용법

 

2.1. Sinks.One

단일 데이터 방출에 최적화된 싱크. Mono로 변환 가능.

Sinks.One sinkOne = Sinks.one();
Mono mono = sinkOne.asMono();

// 데이터 방출 (최초 1회만 성공)
sinkOne.emitValue("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
sinkOne.emitValue("Hi", Sinks.EmitFailureHandler.FAIL_FAST); // 무시됨

// 구독
mono.subscribe(data -> System.out.println("Subscriber1: " + data));
mono.subscribe(data -> System.out.println("Subscriber2: " + data));

 

2.2. Sinks.Many

다중 데이터 방출 지원. Flux로 변환 가능. 세 가지 전략 존재.

 

(1) Unicast

  • 단일 구독자만 허용
  • 두 번째 구독자 접근 시 IllegalStateException 발생
Sinks.Many unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux flux = unicastSink.asFlux();

unicastSink.emitNext("Data1", FAIL_FAST);
unicastSink.emitNext("Data2", FAIL_FAST);

flux.subscribe(data -> System.out.println("Subscriber: " + data));

 

(2) Multicast

  • Hot Publisher: 구독 이후 방출된 데이터만 수신
  • 다중 구독자 지원 (기본: 모든 구독자 취소 시 버퍼 클리어)
Sinks.Many multicastSink = Sinks.many().multicast().directBestEffort();
Flux flux = multicastSink.asFlux();

multicastSink.emitNext("A", FAIL_FAST);
flux.subscribe(data -> System.out.println("Sub1: " + data)); // "A" 수신 X

multicastSink.emitNext("B", FAIL_FAST); // "B" 수신

 

(3) Replay

  • Cold Publisher: 구독 전 데이터도 재전송
  • all(): 전체 기록, limit(int): 최근 N개 기록
Sinks.Many replaySink = Sinks.many().replay().limit(2);
Flux flux = replaySink.asFlux();

replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);

flux.subscribe(data -> System.out.println("Sub1: " + data)); // 2, 3 출력

 


 

3. 주요 설정 옵션

 

3.1. EmitFailureHandler

  • FAIL_FAST: 실패 시 즉시 예외 발생 (권장)
  • FAIL_FAST_CANCEL: 실패 시 구독 취소
  • RETRY_WHEN: 재시도 정책 설정

 

3.2. Backpressure 전략

  • onBackpressureBuffer(): 버퍼 오버플로우 시 에러
  • onBackpressureError(): 버퍼 가득 찰 시 즉시 에러
  • onBackpressureDrop(): 버퍼 가득 찰 시 최신 데이터 드롭

 


 

4. 실전 활용 예시: 실시간 알림 시스템

@Service
public class NotificationService {
    private final Sinks.Many sink = Sinks.many().multicast().directBestEffort();

    public Flux getNotifications() {
        return sink.asFlux();
    }

    public void sendNotification(String message) {
        sink.emitNext(message, FAIL_FAST);
    }
}

// 컨트롤러
@GetMapping("/notifications")
public Flux streamNotifications() {
    return notificationService.getNotifications();
}

 


 

5. 결론: Sinks 사용 시 고려 사항

  1. 유스케이스에 맞는 전략 선택
    • 단일 구독자 → unicast
    • 실시간 브로드캐스팅 → multicast
    • 과거 데이터 재전송 필요 → replay
  2. 스레드 안전성 보장
    • emit* 메서드 외부에서 동기화 필수 (예: synchronized 블록)
  3. 리소스 관리
    • 구독 종료 시 sink.tryEmitComplete()로 명시적 종료 권장
  4. 모니터링
    • Sinks.EmitResult로 방출 결과 로깅 (성공/실패 추적)

 

Sinks는 리액티브 시스템에서 유연한 데이터 방출효율적인 구독 관리를 가능하게 하는 핵심 도구입니다. 전략적 선택과 올바른 사용법을 통해 안정적인 비동기 애플리케이션을 구축하세요!

 

 

[WebFlux] 리액티브 스트림즈 Sinks

Sinks란? Sinks는 리액티브 스트림즈에서 Signal을 전송하는 데 사용되는 구성 요소입니다. 이전 버전의...

blog.naver.com

 

 

[WebFlux] Backpressure란? – 리액티브 데이터 흐름 제어의 핵심

Backpressure(배압)는 리액티브 프로그래밍에서 데이터 생산자(Publisher)와 소비자(Subscriber) 간의 흐름 제어 메커니즘입니다. 시스템이 데이터 과부하를 방지하고 안정성을 보장하기 위해 반드시 이

hoosfa.tistory.com

 

반응형