archie-swif / webflux-mdc

36 stars 12 forks source link

Can't get value in different streams #2

Open jmilktea opened 4 years ago

jmilktea commented 4 years ago

Can't get value in different streams

eg: Setting parameters using webfilter

@Component
public class MdcWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        return webFilterChain.filter(serverWebExchange).subscriberContext(ctx -> ctx.put("traceId", 123));
    }
}
private final static Scheduler SETTLEMENT_SCHEDULER = Schedulers.newParallel("settlement-scheduler", 30);

@GetMapping("/test")
public Flux<Integer> test() {
    //123
    System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));

    Mono.just(1).map(s -> {
        thread1();
        return 1;
    }).subscribeOn(SETTLEMENT_SCHEDULER).subscribe();

    return Flux.just(1).map(s -> {
        thread2();
        return 1;
    }).subscribeOn(SETTLEMENT_SCHEDULER);
}

private void thread1() {
    //null
    System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));
}

private void thread2() {
    //123
    System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));
}
ghenadiibatalski commented 4 years ago

I think, i know what the problem is. You are the subscriber now and you have lost the original subscriber context with the traceId, so you need to take over the original subscriber context eg.:

  //123
        System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));

        return Mono.subscriberContext()
                .doOnNext(context -> Mono.just(1).subscriberContext(context) // here the take over
                        .map(s -> {
                            thread1(); // and it works here
                            return 1;
                        }).doOnNext(integer -> {
                            System.out.println(Thread.currentThread().getId() + ":" + MDC.get(CORRELATION_ID_KEY));  // and here!
                        })
                        .subscribeOn(SETTLEMENT_SCHEDULER)
                        .subscribe())
                .thenMany(
                        Flux.just(1).map(s -> {
                            thread2();
                            return 1;
                        }).subscribeOn(SETTLEMENT_SCHEDULER));