IT기술/webflux (reactor)

WebFlux Context 완벽 가이드: 리액티브 프로그래밍에서의 상태 관리와 데이터 공유

후스파 2025. 7. 6. 07:29
반응형

WebFlux에서 Context는 비동기 프로그래밍 중에 필요한 정보를 저장하고 전달하는 메커니즘입니다.
이는 Reactor의 구성 요소 간에 데이터를 공유할 수 있도록 해주며, 특정 상황을 처리하기 위한 필요한 정보를 담고 있습니다. Context는 주로 구독이 발생할 때 생성되며, 각 구독에 대해 독립적인 상태를 유지합니다.


Context의 역할

상태 관리

각 구독마다 독립적인 Context가 생성되므로, 상태 정보를 안전하게 저장하고 전파할 수 있습니다.

데이터 공유

Operator 간에 공유할 수 있는 key-value 형태의 저장소로 작동하여, 같은 체인 내에서 동일한 정보를 사용할 수 있게 합니다.

비동기 처리

비동기적으로 처리되는 작업에서 데이터를 전달하기 위한 방법으로 사용됩니다.

스레드 안전성

Reactor의 Context는 실행 스레드와 매핑되는 것이 아니라, Subscriber와 매핑됩니다. 즉, 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context가 생성되어 스레드 간 안전한 데이터 공유를 보장합니다.


Context 데이터 쓰기

contextWrite() operator를 사용하여 Context에 데이터를 쓸 수 있습니다. 이 연산자는 함수형 인터페이스를 파라미터로 받아, 람다 표현식으로 작성할 수 있습니다.

import reactor.core.publisher.Mono;
import reactor.util.context.Context;

// 기본적인 Context 데이터 쓰기
Mono mono = Mono.just("Hello")
    .contextWrite(Context.of("key", "value"));

// 여러 데이터 저장
Mono multiDataMono = Mono.just("Hello")
    .contextWrite(Context.of("user", "john", "role", "admin"));

// 기존 Context에 데이터 추가
Mono addDataMono = Mono.just("Hello")
    .contextWrite(context -> context.put("timestamp", System.currentTimeMillis()));

위 코드에서 contextWrite()는 Context에 "key"와 "value"를 저장합니다. 데이터 쓰기는 Context API의 put() 메서드를 통해 이루어집니다.

Context 쓰기 고급 예제

public class AdvancedContextWriteExample {
    public static void main(String[] args) {
        // 사용자 인증 정보를 Context에 저장
        Mono userProcessing = Mono.just("Processing user data")
            .doOnNext(data -> System.out.println("Processing: " + data))
            .contextWrite(context -> 
                context.put("userId", "user123")
                       .put("sessionId", "session456")
                       .put("permissions", List.of("READ", "WRITE")))
            .contextWrite(Context.of("requestId", UUID.randomUUID().toString()));

        userProcessing.subscribe(System.out::println);
    }
}

Context 데이터 읽기

원본 데이터 소스에서 읽기

deferContextual() operator를 사용하여 원본 데이터 소스 레벨에서 Context에 저장된 데이터를 읽을 수 있습니다. 이때 파라미터는 ContextView 타입의 객체를 사용합니다. ContextView는 읽기 전용이며, 데이터를 안전하게 조회할 수 있도록 도와줍니다.

Mono result = Mono.deferContextual(ctx -> {
    String value = ctx.getOrDefault("key", "default");
    return Mono.just("Value: " + value);
});

Operator 체인 중간에서 읽기

transformDeferredContextual() operator를 사용하여 Operator 체인 중간에서 Context의 데이터를 읽을 수도 있습니다.

public class ContextReadExample {
    public static void main(String[] args) {
        String key = "message";

        Mono mono = Mono.deferContextual(ctx ->
            Mono.just("Hello " + ctx.get(key))
                .doOnNext(data -> System.out.println("First read: " + data))
        )
        .subscribeOn(Schedulers.boundedElastic())
        .publishOn(Schedulers.parallel())
        .transformDeferredContextual((mono2, ctx) -> 
            mono2.map(data -> data + " " + ctx.get(key)))
        .contextWrite(context -> context.put(key, "Reactor"));

        mono.subscribe(data -> System.out.println("Final result: " + data));
        // 출력: Hello Reactor Reactor
    }
}

실무 활용 예제

@Service
public class UserService {

    public Mono getUserWithContext(String userId) {
        return Mono.deferContextual(ctx -> {
            String requestId = ctx.getOrDefault("requestId", "unknown");
            String sessionId = ctx.getOrDefault("sessionId", "anonymous");

            return userRepository.findById(userId)
                .map(user -> new UserResponse(user, requestId, sessionId))
                .doOnNext(response -> 
                    log.info("User retrieved - RequestId: {}, SessionId: {}", 
                            requestId, sessionId));
        });
    }

    public Mono processUserData(String userId) {
        return getUserWithContext(userId)
            .transformDeferredContextual((userMono, ctx) -> {
                List permissions = ctx.getOrDefault("permissions", List.of());
                return userMono.filter(user -> permissions.contains("READ"))
                              .switchIfEmpty(Mono.error(new AccessDeniedException("No read permission")));
            })
            .map(user -> "Processed: " + user.getName());
    }
}

주요 Context API

Context 쓰기 API

// put(key, value): Context에 데이터를 저장
Context context = Context.of("key1", "value1")
                         .put("key2", "value2");

// of(key, value, ...): 여러 개의 데이터를 한 번에 저장
Context multiContext = Context.of(
    "user", "john",
    "role", "admin",
    "timestamp", System.currentTimeMillis()
);

// putAll(ContextView): 다른 ContextView를 합침
Context mergedContext = context.putAll(otherContextView);

// delete(key): 특정 키에 해당하는 데이터를 삭제
Context deletedContext = context.delete("key1");

// readOnly(): Context 객체를 ContextView로 변환
ContextView readOnlyView = context.readOnly();

주요 ContextView API

// get(key): 특정 키에 해당하는 값을 반환
String value = contextView.get("key");

// getOrEmpty(key): 값을 Optional로 래핑하여 반환
Optional optionalValue = contextView.getOrEmpty("key");

// getOrDefault(key, defaultValue): 값이 없을 경우 기본값을 반환
String valueWithDefault = contextView.getOrDefault("key", "defaultValue");

// hasKey(key): 특정 키의 존재 여부를 확인
boolean hasKey = contextView.hasKey("key");

// isEmpty(): Context가 비어 있는지 확인
boolean isEmpty = contextView.isEmpty();

// size(): 저장된 데이터의 개수를 반환
int size = contextView.size();

ContextView 활용 예제

public class ContextViewExample {
    public static void main(String[] args) {
        Mono processing = Mono.deferContextual(ctx -> {
            // 안전한 값 조회
            Optional userId = ctx.getOrEmpty("userId");
            String role = ctx.getOrDefault("role", "guest");

            if (userId.isPresent() && ctx.hasKey("permissions")) {
                List permissions = ctx.get("permissions");
                return Mono.just(String.format("User %s with role %s has %d permissions", 
                    userId.get(), role, permissions.size()));
            } else {
                return Mono.just("Anonymous user with role " + role);
            }
        })
        .contextWrite(Context.of(
            "userId", "user123",
            "role", "admin",
            "permissions", List.of("READ", "WRITE", "DELETE")
        ));

        processing.subscribe(System.out::println);
    }
}

Context의 특징

구독마다 Context 생성

구독이 발생할 때마다 독립적인 Context가 생성됩니다. 즉, subscribe() 호출 시마다 새로운 Context가 연결됩니다.

Mono source = Mono.just("Hello")
    .contextWrite(Context.of("key", "value"));

// 각 구독마다 독립적인 Context
source.subscribe(data -> System.out.println("Subscription 1: " + data));
source.subscribe(data -> System.out.println("Subscription 2: " + data));

전파 방식

Context는 Operator 체인에서 아래에서 위로 전파되므로, 모든 Operator에서 데이터를 사용하기 위해서는 체인의 마지막에서 contextWrite()를 호출해야 합니다.

// 올바른 Context 전파
Mono correctPropagation = Mono.deferContextual(ctx -> 
    Mono.just("Hello " + ctx.get("name"))
)
.map(String::toUpperCase)
.contextWrite(Context.of("name", "World")); // 체인의 마지막에 위치

// 잘못된 Context 전파 (읽기 전에 쓰기가 위치)
Mono incorrectPropagation = Mono.just("Hello")
    .contextWrite(Context.of("name", "World")) // 너무 일찍 위치
    .flatMap(value -> Mono.deferContextual(ctx -> 
        Mono.just(value + " " + ctx.getOrDefault("name", "Unknown"))
    ));

Inner Sequence에서의 Context 격리

public class InnerSequenceContextExample {
    public static void main(String[] args) {
        Mono outerMono = Mono.just("Outer")
            .flatMap(outer -> 
                Mono.just("Inner")
                    .contextWrite(Context.of("inner", "innerValue")) // Inner Sequence에서만 유효
                    .flatMap(inner -> Mono.deferContextual(ctx -> {
                        String innerValue = ctx.getOrDefault("inner", "not found");
                        String outerValue = ctx.getOrDefault("outer", "not found");
                        return Mono.just(String.format("%s-%s, outer: %s, inner: %s", 
                            outer, inner, outerValue, innerValue));
                    }))
            )
            .contextWrite(Context.of("outer", "outerValue"));

        outerMono.subscribe(System.out::println);
        // 출력: Outer-Inner, outer: outerValue, inner: innerValue
    }
}

실무 활용 예시

HTTP 요청에서의 Context 활용

@Component
public class ContextFilter implements WebFilter {

    @Override
    public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
        String requestId = UUID.randomUUID().toString();
        String userAgent = exchange.getRequest().getHeaders().getFirst("User-Agent");

        return chain.filter(exchange)
            .contextWrite(Context.of(
                "requestId", requestId,
                "userAgent", userAgent,
                "startTime", System.currentTimeMillis()
            ));
    }
}

@RestController
public class UserController {

    @GetMapping("/users/{id}")
    public Mono getUser(@PathVariable String id) {
        return Mono.deferContextual(ctx -> {
            String requestId = ctx.get("requestId");
            long startTime = ctx.get("startTime");

            return userService.findById(id)
                .doOnNext(user -> log.info("User found - RequestId: {}, Duration: {}ms", 
                    requestId, System.currentTimeMillis() - startTime))
                .map(user -> new UserResponse(user, requestId));
        });
    }
}

MDC와 Context 통합 (Spring Boot 3)

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
        // Spring Boot 3에서 자동 Context 전파 활성화
        Hooks.enableAutomaticContextPropagation();
    }
}

@Component
public class MdcLoggingFilter implements WebFilter {
    private static final String MDC_KEY = "_MDC_KEY_";

    @PostConstruct
    public void setUp() {
        // MDC와 Reactor Context 연동
        ContextRegistry.getInstance().registerThreadLocalAccessor(
            MDC_KEY,
            MDC::getCopyOfContextMap,
            MDC::setContextMap,
            MDC::clear
        );
    }

    @Override
    public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
        String requestId = UUID.randomUUID().toString();

        return chain.filter(exchange)
            .contextWrite(Context.of(MDC_KEY, Map.of(
                "requestId", requestId,
                "method", exchange.getRequest().getMethod().name(),
                "path", exchange.getRequest().getPath().value()
            )));
    }
}

예시

아래는 WebFlux의 Context를 활용한 간단한 예시입니다.

import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class ContextExample {
    public static void main(String[] args) {
        Mono mono = Mono.just("Hello")
            .contextWrite(Context.of("key", "value"))
            .flatMap(value -> Mono.deferContextual(ctx -> {
                String contextValue = ctx.getOrDefault("key", "default");
                return Mono.just(value + " " + contextValue);
            }));

        mono.subscribe(System.out::println); // 출력: Hello value
    }
}

이 예시에서는 Context에 "key"와 "value"를 저장하고, deferContextual()을 통해 해당 값을 읽어와 결과를 출력합니다.

복합 예제: 사용자 인증과 권한 확인

@Service
public class AuthenticationService {

    public Mono authenticateAndProcess(String token, String operation) {
        return validateToken(token)
            .flatMap(this::getUserPermissions)
            .flatMap(user -> processWithPermissions(operation)
                .contextWrite(Context.of(
                    "userId", user.getId(),
                    "username", user.getUsername(),
                    "permissions", user.getPermissions(),
                    "authTime", System.currentTimeMillis()
                ))
            );
    }

    private Mono processWithPermissions(String operation) {
        return Mono.deferContextual(ctx -> {
            List permissions = ctx.get("permissions");
            String username = ctx.get("username");

            if (permissions.contains(operation.toUpperCase())) {
                return Mono.just("User " + username + " successfully performed " + operation);
            } else {
                return Mono.error(new AccessDeniedException("Insufficient permissions"));
            }
        });
    }

    private Mono validateToken(String token) {
        // 토큰 검증 로직
        return Mono.just(new User("user123", "john", List.of("READ", "WRITE")));
    }

    private Mono getUserPermissions(User user) {
        // 사용자 권한 조회 로직
        return Mono.just(user);
    }
}

결론

WebFlux의 Context는 비동기 프로그래밍에서 데이터 상태를 관리하고 공유할 수 있는 강력한 도구입니다. 각 구독마다 독립적인 상태를 유지하고, Operator 간에 데이터를 안전하게 전달할 수 있도록 도와줍니다.
핵심 포인트:

  • 구독별 독립적인 Context 생성으로 스레드 안전성 보장
  • contextWrite()로 데이터 쓰기, deferContextual()로 데이터 읽기
  • 체인의 마지막에서 contextWrite() 호출하여 전체 체인에서 사용 가능
  • ContextView를 통한 안전한 읽기 전용 접근
  • Spring Boot 3에서 MDC와 자동 통합 지원

Context를 올바르게 활용하면 리액티브 스트림에서 상태 정보를 안전하고 효율적으로 관리할 수 있으며, 복잡한 비동기 처리 로직에서도 일관된 데이터 흐름을 유지할 수 있습니다.

반응형