IT기술/webflux (reactor)

WebFlux와 Reactor 디버깅 완벽 가이드: 비동기 프로그래밍에서의 효과적인 문제 해결

후스파 2025. 7. 8. 07:08
반응형

WebFlux와 Reactor를 사용할 때 디버깅은 중요한 작업입니다.
비동기적이고 반응형 프로그래밍 모델에서는 예외 처리 및 디버깅 방법이 동기식 프로그래밍과는 다소 다릅니다.
다음은 WebFlux와 Reactor에서 디버깅을 수행하는 방법에 대한 개요입니다.


Reactor 디버깅

Reactor는 비동기 스트림 처리 라이브러리로, 디버깅을 위해 몇 가지 유용한 도구와 기법을 제공합니다.

doOnNext(), doOnError(), doOnComplete()

이런 메서드들은 스트림의 각 단계에서 특정 작업을 수행하도록 도와줍니다. 이를 통해 데이터 흐름을 추적하고, 오류를 더 쉽게 찾을 수 있습니다.

import reactor.core.publisher.Mono;

Mono mono = Mono.just("Hello")
    .doOnNext(data -> System.out.println("Received: " + data))
    .map(data -> {
        if (data.equals("Hello")) {
            throw new RuntimeException("Error occurred!");
        }
        return data.toUpperCase();
    })
    .doOnError(error -> System.err.println("Error: " + error.getMessage()))
    .doOnComplete(() -> System.out.println("Completed!"));

mono.subscribe();

위의 예제에서는 doOnNext()를 사용하여 수신된 데이터를 출력하고, doOnError()를 통해 발생한 오류를 출력합니다.

log() 메서드

log() 메서드는 스트림의 모든 이벤트(구독, 데이터 수신, 오류 발생, 완료)를 로깅할 수 있도록 도와줍니다. 이 메서드는 디버깅 중에 매우 유용합니다.

import reactor.core.publisher.Flux;

Flux flux = Flux.just("A", "B", "C")
    .log();

flux.subscribe();

이 코드를 실행하면 각 이벤트에 대한 로그를 콘솔에서 확인할 수 있습니다.

고급 로깅 기법

public class AdvancedLoggingExample {
    private static final Logger log = LoggerFactory.getLogger(AdvancedLoggingExample.class);

    public Mono processUserData(String userId) {
        return Mono.just(userId)
            .log("UserProcessing", Level.INFO) // 커스텀 로그 레벨과 카테고리
            .doOnSubscribe(subscription -> log.info("Starting user processing for: {}", userId))
            .flatMap(this::fetchUserFromDatabase)
            .doOnNext(user -> log.debug("User fetched: {}", user))
            .map(this::transformUserData)
            .doOnNext(transformed -> log.debug("Data transformed: {}", transformed))
            .doOnSuccess(result -> log.info("Processing completed for user: {}", userId))
            .doOnError(error -> log.error("Error processing user {}: {}", userId, error.getMessage()))
            .doFinally(signalType -> log.info("Processing finished with signal: {}", signalType));
    }
}

고급 디버깅 기법

Hooks.onOperatorDebug() - 전역 디버그 모드

Hooks.onOperatorDebug()는 Reactor에 모든 연산자에 대해 디버그 모드를 활성화하도록 지시하여, 더 자세한 오류 메시지와 스택 트레이스를 제공합니다.

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        // 애플리케이션 시작 시 전역 디버그 모드 활성화
        Hooks.onOperatorDebug();
        SpringApplication.run(Application.class, args);
    }
}

@RestController
public class UserController {
    public Mono greeting(@PathVariable String firstName, @PathVariable String lastName) {
        return Flux.fromIterable(Arrays.asList(firstName, lastName))
            .filter(name -> !name.equals("John")) // John이 아닌 이름만 필터링
            .single() // 하나의 요소만 방출 (두 개 이상이면 예외 발생)
            .map(String::toUpperCase);
    }
}

Hooks.onOperatorDebug()를 추가하면 더 유용한 스택 트레이스를 얻을 수 있습니다:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    ...
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
    reactor.core.publisher.Flux.single(Flux.java:8336)
    com.example.UserController.greeting(UserController.java:42)

checkpoint() - 특정 지점 디버깅

checkpoint() 연산자를 사용하여 특정 지점에서만 디버깅 정보를 수집할 수 있습니다.

public Mono processWithCheckpoints(String data) {
    return Mono.just(data)
        .checkpoint("Initial data processing")
        .map(this::validateData)
        .checkpoint("After validation")
        .flatMap(this::enrichData)
        .checkpoint("After enrichment")
        .map(this::finalTransformation)
        .checkpoint("Final transformation completed");
}

ReactorDebugAgent - 성능 최적화된 디버깅

// VM 옵션으로 추가: -javaagent:reactor-tools-3.x.x.jar
// 또는 프로그래밍 방식으로 활성화
public class Application {
    static {
        ReactorDebugAgent.init();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

예외 처리

WebFlux에서는 비동기 작업 중 발생하는 예외를 처리하는 방법이 중요합니다. 일반적으로, 비동기식 프로그래밍에서는 예외가 발생하면 호출 스택이 파괴되기 때문에, 예외 처리를 명시적으로 구현해야 합니다.

onErrorReturn(), onErrorResume()

이 메서드들은 스트림에서 발생한 예외를 처리하는 데 사용됩니다.
onErrorReturn(): 예외가 발생했을 때 기본 값을 반환합니다.

Flux flux = Flux.just("A", "B", "C")
    .map(data -> {
        if (data.equals("B")) {
            throw new RuntimeException("Error!");
        }
        return data;
    })
    .onErrorReturn("Fallback");

flux.subscribe(System.out::println); // A, Fallback

onErrorResume(): 예외가 발생했을 때 다른 스트림을 반환할 수 있습니다.

Flux flux = Flux.just("A", "B", "C")
    .map(data -> {
        if (data.equals("B")) {
            throw new RuntimeException("Error!");
        }
        return data;
    })
    .onErrorResume(error -> {
        System.err.println("Error: " + error.getMessage());
        return Flux.just("Fallback1", "Fallback2"); // 대체 스트림 반환
    });

flux.subscribe(System.out::println); // A, Fallback1, Fallback2

고급 예외 처리 패턴

@Service
public class UserService {
    private static final Logger log = LoggerFactory.getLogger(UserService.class);

    public Mono getUserWithFallback(String userId) {
        return userRepository.findById(userId)
            .cast(User.class)
            .onErrorResume(DatabaseException.class, ex -> {
                log.warn("Database error for user {}, trying cache", userId, ex);
                return cacheService.getUser(userId);
            })
            .onErrorResume(CacheException.class, ex -> {
                log.warn("Cache error for user {}, using default", userId, ex);
                return Mono.just(createDefaultUser(userId));
            })
            .onErrorMap(Exception.class, ex -> 
                new UserServiceException("Failed to get user: " + userId, ex))
            .doOnError(ex -> log.error("Unhandled error for user: {}", userId, ex));
    }

    public Flux getAllUsersWithRetry() {
        return userRepository.findAll()
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .filter(throwable -> throwable instanceof TransientException))
            .onErrorContinue((throwable, obj) -> {
                log.error("Error processing user object: {}", obj, throwable);
            })
            .timeout(Duration.ofSeconds(30))
            .onErrorResume(TimeoutException.class, ex -> {
                log.error("Timeout getting all users", ex);
                return Flux.empty();
            });
    }
}

WebFlux 디버깅 방법

로깅 전략

@Component
public class LoggingWebFilter implements WebFilter {
    private static final Logger log = LoggerFactory.getLogger(LoggingWebFilter.class);

    @Override
    public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
        String requestId = UUID.randomUUID().toString();
        ServerHttpRequest request = exchange.getRequest();

        long startTime = System.currentTimeMillis();

        return chain.filter(exchange)
            .doOnSubscribe(subscription -> 
                log.info("Request started - ID: {}, Method: {}, Path: {}", 
                    requestId, request.getMethod(), request.getPath()))
            .doOnSuccess(result -> 
                log.info("Request completed - ID: {}, Duration: {}ms", 
                    requestId, System.currentTimeMillis() - startTime))
            .doOnError(error -> 
                log.error("Request failed - ID: {}, Duration: {}ms, Error: {}", 
                    requestId, System.currentTimeMillis() - startTime, error.getMessage()))
            .contextWrite(Context.of("requestId", requestId));
    }
}

테스트 환경에서의 디버깅

@ExtendWith(SpringExtension.class)
@WebFluxTest(UserController.class)
class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private UserService userService;

    @Test
    void testUserEndpointWithDebugging() {
        // Given
        String userId = "test-user";
        User expectedUser = new User(userId, "Test User");

        when(userService.getUser(userId))
            .thenReturn(Mono.just(expectedUser)
                .log("UserService.getUser") // 테스트에서 로깅 활성화
                .checkpoint("User service call completed"));

        // When & Then
        webTestClient.get()
            .uri("/users/{id}", userId)
            .exchange()
            .expectStatus().isOk()
            .expectBody(User.class)
            .value(user -> {
                assertThat(user.getId()).isEqualTo(userId);
                assertThat(user.getName()).isEqualTo("Test User");
            });
    }

    @Test
    void testErrorHandlingWithStepVerifier() {
        // Given
        String userId = "error-user";
        when(userService.getUser(userId))
            .thenReturn(Mono.error(new UserNotFoundException("User not found")));

        // When & Then
        StepVerifier.create(userService.getUser(userId))
            .expectErrorMatches(throwable -> 
                throwable instanceof UserNotFoundException &&
                throwable.getMessage().equals("User not found"))
            .verify();
    }
}

프로덕션 환경 모니터링

@Component
public class ReactiveMetrics {
    private final MeterRegistry meterRegistry;
    private final Counter errorCounter;
    private final Timer requestTimer;

    public ReactiveMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.errorCounter = Counter.builder("reactive.errors")
            .description("Number of reactive stream errors")
            .register(meterRegistry);
        this.requestTimer = Timer.builder("reactive.requests")
            .description("Request processing time")
            .register(meterRegistry);
    }

    public  Mono monitorMono(Mono mono, String operation) {
        return mono
            .name(operation)
            .metrics()
            .doOnError(error -> {
                errorCounter.increment(
                    Tags.of(
                        Tag.of("operation", operation),
                        Tag.of("error.type", error.getClass().getSimpleName())
                    )
                );
            })
            .transformDeferred(Operators.lift((scannable, coreSubscriber) -> 
                new TimedSubscriber<>(coreSubscriber, requestTimer, operation)));
    }
}

결론

WebFlux와 Reactor에서 디버깅은 비동기적이고 반응형 프로그래밍 모델에 맞춰 수행되어야 합니다.
doOnNext(), doOnError(), log()와 같은 메서드를 활용하여 데이터 흐름을 추적하고, 예외 처리 메커니즘을 통해 발생하는 오류를 관리할 수 있습니다.
핵심 포인트:

  • Hooks.onOperatorDebug()로 전역 디버그 모드 활성화하여 상세한 스택 트레이스 확보
  • checkpoint() 연산자로 특정 지점에서만 디버깅 정보 수집
  • log() 메서드로 스트림 이벤트 실시간 모니터링
  • onErrorReturn/onErrorResume으로 우아한 예외 처리
  • ReactorDebugAgent로 성능 최적화된 프로덕션 디버깅
  • Context와 MDC 통합으로 요청별 추적 가능

리액티브 프로그래밍에서 효과적인 디버깅을 위해서는 전통적인 동기식 디버깅 방법과는 다른 접근이 필요하며, Reactor가 제공하는 다양한 도구들을 적절히 조합하여 사용하는 것이 중요합니다.

반응형