
WebFlux와 Reactor를 사용할 때 디버깅은 중요한 작업입니다.
비동기적이고 반응형 프로그래밍 모델에서는 예외 처리 및 디버깅 방법이 동기식 프로그래밍과는 다소 다릅니다.
다음은 WebFlux와 Reactor에서 디버깅을 수행하는 방법에 대한 개요입니다.
Reactor 디버깅
Reactor는 비동기 스트림 처리 라이브러리로, 디버깅을 위해 몇 가지 유용한 도구와 기법을 제공합니다.
doOnNext(), doOnError(), doOnComplete()
이런 메서드들은 스트림의 각 단계에서 특정 작업을 수행하도록 도와줍니다. 이를 통해 데이터 흐름을 추적하고, 오류를 더 쉽게 찾을 수 있습니다.
import reactor.core.publisher.Mono;
Mono mono = Mono.just("Hello")
.doOnNext(data -> System.out.println("Received: " + data))
.map(data -> {
if (data.equals("Hello")) {
throw new RuntimeException("Error occurred!");
}
return data.toUpperCase();
})
.doOnError(error -> System.err.println("Error: " + error.getMessage()))
.doOnComplete(() -> System.out.println("Completed!"));
mono.subscribe();위의 예제에서는 doOnNext()를 사용하여 수신된 데이터를 출력하고, doOnError()를 통해 발생한 오류를 출력합니다.
log() 메서드
log() 메서드는 스트림의 모든 이벤트(구독, 데이터 수신, 오류 발생, 완료)를 로깅할 수 있도록 도와줍니다. 이 메서드는 디버깅 중에 매우 유용합니다.
import reactor.core.publisher.Flux;
Flux flux = Flux.just("A", "B", "C")
.log();
flux.subscribe();이 코드를 실행하면 각 이벤트에 대한 로그를 콘솔에서 확인할 수 있습니다.
고급 로깅 기법
public class AdvancedLoggingExample {
private static final Logger log = LoggerFactory.getLogger(AdvancedLoggingExample.class);
public Mono processUserData(String userId) {
return Mono.just(userId)
.log("UserProcessing", Level.INFO) // 커스텀 로그 레벨과 카테고리
.doOnSubscribe(subscription -> log.info("Starting user processing for: {}", userId))
.flatMap(this::fetchUserFromDatabase)
.doOnNext(user -> log.debug("User fetched: {}", user))
.map(this::transformUserData)
.doOnNext(transformed -> log.debug("Data transformed: {}", transformed))
.doOnSuccess(result -> log.info("Processing completed for user: {}", userId))
.doOnError(error -> log.error("Error processing user {}: {}", userId, error.getMessage()))
.doFinally(signalType -> log.info("Processing finished with signal: {}", signalType));
}
}고급 디버깅 기법
Hooks.onOperatorDebug() - 전역 디버그 모드
Hooks.onOperatorDebug()는 Reactor에 모든 연산자에 대해 디버그 모드를 활성화하도록 지시하여, 더 자세한 오류 메시지와 스택 트레이스를 제공합니다.
@SpringBootApplication
public class Application {
public static void main(String[] args) {
// 애플리케이션 시작 시 전역 디버그 모드 활성화
Hooks.onOperatorDebug();
SpringApplication.run(Application.class, args);
}
}
@RestController
public class UserController {
public Mono greeting(@PathVariable String firstName, @PathVariable String lastName) {
return Flux.fromIterable(Arrays.asList(firstName, lastName))
.filter(name -> !name.equals("John")) // John이 아닌 이름만 필터링
.single() // 하나의 요소만 방출 (두 개 이상이면 예외 발생)
.map(String::toUpperCase);
}
}Hooks.onOperatorDebug()를 추가하면 더 유용한 스택 트레이스를 얻을 수 있습니다:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
...
Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:8336)
com.example.UserController.greeting(UserController.java:42)checkpoint() - 특정 지점 디버깅
checkpoint() 연산자를 사용하여 특정 지점에서만 디버깅 정보를 수집할 수 있습니다.
public Mono processWithCheckpoints(String data) {
return Mono.just(data)
.checkpoint("Initial data processing")
.map(this::validateData)
.checkpoint("After validation")
.flatMap(this::enrichData)
.checkpoint("After enrichment")
.map(this::finalTransformation)
.checkpoint("Final transformation completed");
}ReactorDebugAgent - 성능 최적화된 디버깅
// VM 옵션으로 추가: -javaagent:reactor-tools-3.x.x.jar
// 또는 프로그래밍 방식으로 활성화
public class Application {
static {
ReactorDebugAgent.init();
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}예외 처리
WebFlux에서는 비동기 작업 중 발생하는 예외를 처리하는 방법이 중요합니다. 일반적으로, 비동기식 프로그래밍에서는 예외가 발생하면 호출 스택이 파괴되기 때문에, 예외 처리를 명시적으로 구현해야 합니다.
onErrorReturn(), onErrorResume()
이 메서드들은 스트림에서 발생한 예외를 처리하는 데 사용됩니다.
onErrorReturn(): 예외가 발생했을 때 기본 값을 반환합니다.
Flux flux = Flux.just("A", "B", "C")
.map(data -> {
if (data.equals("B")) {
throw new RuntimeException("Error!");
}
return data;
})
.onErrorReturn("Fallback");
flux.subscribe(System.out::println); // A, FallbackonErrorResume(): 예외가 발생했을 때 다른 스트림을 반환할 수 있습니다.
Flux flux = Flux.just("A", "B", "C")
.map(data -> {
if (data.equals("B")) {
throw new RuntimeException("Error!");
}
return data;
})
.onErrorResume(error -> {
System.err.println("Error: " + error.getMessage());
return Flux.just("Fallback1", "Fallback2"); // 대체 스트림 반환
});
flux.subscribe(System.out::println); // A, Fallback1, Fallback2고급 예외 처리 패턴
@Service
public class UserService {
private static final Logger log = LoggerFactory.getLogger(UserService.class);
public Mono getUserWithFallback(String userId) {
return userRepository.findById(userId)
.cast(User.class)
.onErrorResume(DatabaseException.class, ex -> {
log.warn("Database error for user {}, trying cache", userId, ex);
return cacheService.getUser(userId);
})
.onErrorResume(CacheException.class, ex -> {
log.warn("Cache error for user {}, using default", userId, ex);
return Mono.just(createDefaultUser(userId));
})
.onErrorMap(Exception.class, ex ->
new UserServiceException("Failed to get user: " + userId, ex))
.doOnError(ex -> log.error("Unhandled error for user: {}", userId, ex));
}
public Flux getAllUsersWithRetry() {
return userRepository.findAll()
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof TransientException))
.onErrorContinue((throwable, obj) -> {
log.error("Error processing user object: {}", obj, throwable);
})
.timeout(Duration.ofSeconds(30))
.onErrorResume(TimeoutException.class, ex -> {
log.error("Timeout getting all users", ex);
return Flux.empty();
});
}
}WebFlux 디버깅 방법
로깅 전략
@Component
public class LoggingWebFilter implements WebFilter {
private static final Logger log = LoggerFactory.getLogger(LoggingWebFilter.class);
@Override
public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
String requestId = UUID.randomUUID().toString();
ServerHttpRequest request = exchange.getRequest();
long startTime = System.currentTimeMillis();
return chain.filter(exchange)
.doOnSubscribe(subscription ->
log.info("Request started - ID: {}, Method: {}, Path: {}",
requestId, request.getMethod(), request.getPath()))
.doOnSuccess(result ->
log.info("Request completed - ID: {}, Duration: {}ms",
requestId, System.currentTimeMillis() - startTime))
.doOnError(error ->
log.error("Request failed - ID: {}, Duration: {}ms, Error: {}",
requestId, System.currentTimeMillis() - startTime, error.getMessage()))
.contextWrite(Context.of("requestId", requestId));
}
}테스트 환경에서의 디버깅
@ExtendWith(SpringExtension.class)
@WebFluxTest(UserController.class)
class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserService userService;
@Test
void testUserEndpointWithDebugging() {
// Given
String userId = "test-user";
User expectedUser = new User(userId, "Test User");
when(userService.getUser(userId))
.thenReturn(Mono.just(expectedUser)
.log("UserService.getUser") // 테스트에서 로깅 활성화
.checkpoint("User service call completed"));
// When & Then
webTestClient.get()
.uri("/users/{id}", userId)
.exchange()
.expectStatus().isOk()
.expectBody(User.class)
.value(user -> {
assertThat(user.getId()).isEqualTo(userId);
assertThat(user.getName()).isEqualTo("Test User");
});
}
@Test
void testErrorHandlingWithStepVerifier() {
// Given
String userId = "error-user";
when(userService.getUser(userId))
.thenReturn(Mono.error(new UserNotFoundException("User not found")));
// When & Then
StepVerifier.create(userService.getUser(userId))
.expectErrorMatches(throwable ->
throwable instanceof UserNotFoundException &&
throwable.getMessage().equals("User not found"))
.verify();
}
}프로덕션 환경 모니터링
@Component
public class ReactiveMetrics {
private final MeterRegistry meterRegistry;
private final Counter errorCounter;
private final Timer requestTimer;
public ReactiveMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.errorCounter = Counter.builder("reactive.errors")
.description("Number of reactive stream errors")
.register(meterRegistry);
this.requestTimer = Timer.builder("reactive.requests")
.description("Request processing time")
.register(meterRegistry);
}
public Mono monitorMono(Mono mono, String operation) {
return mono
.name(operation)
.metrics()
.doOnError(error -> {
errorCounter.increment(
Tags.of(
Tag.of("operation", operation),
Tag.of("error.type", error.getClass().getSimpleName())
)
);
})
.transformDeferred(Operators.lift((scannable, coreSubscriber) ->
new TimedSubscriber<>(coreSubscriber, requestTimer, operation)));
}
}결론
WebFlux와 Reactor에서 디버깅은 비동기적이고 반응형 프로그래밍 모델에 맞춰 수행되어야 합니다.
doOnNext(), doOnError(), log()와 같은 메서드를 활용하여 데이터 흐름을 추적하고, 예외 처리 메커니즘을 통해 발생하는 오류를 관리할 수 있습니다.
핵심 포인트:
- Hooks.onOperatorDebug()로 전역 디버그 모드 활성화하여 상세한 스택 트레이스 확보
- checkpoint() 연산자로 특정 지점에서만 디버깅 정보 수집
- log() 메서드로 스트림 이벤트 실시간 모니터링
- onErrorReturn/onErrorResume으로 우아한 예외 처리
- ReactorDebugAgent로 성능 최적화된 프로덕션 디버깅
- Context와 MDC 통합으로 요청별 추적 가능
리액티브 프로그래밍에서 효과적인 디버깅을 위해서는 전통적인 동기식 디버깅 방법과는 다른 접근이 필요하며, Reactor가 제공하는 다양한 도구들을 적절히 조합하여 사용하는 것이 중요합니다.
'IT기술 > webflux (reactor)' 카테고리의 다른 글
| WebFlux Context 완벽 가이드: 리액티브 프로그래밍에서의 상태 관리와 데이터 공유 (0) | 2025.07.06 |
|---|---|
| Java Optional 완벽 가이드: null 안전성을 위한 차세대 프로그래밍 패러다임 (0) | 2025.07.04 |
| [WebFlux] Scheduler의 역할과 활용 방법 (0) | 2025.04.29 |
| [WebFlux] 리액티브 스트림즈 Sinks: 핵심 개념과 활용 가이드 (0) | 2025.04.28 |
| [WebFlux] Backpressure란? – 리액티브 데이터 흐름 제어의 핵심 (6) | 2025.04.27 |