spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.23k stars 37.98k forks source link

Reactor Context not propagated from Mono to other Reactive implementation when adapting using ReactiveAdapterRegistry #32841

Closed anaconda875 closed 4 months ago

anaconda875 commented 4 months ago

Affects: spring-core-6.1.1 Given this code:

    Mono.just("a")
        .flatMap(a -> {
          return Mono.deferContextual(c -> c.get("KEY"))
              .flatMap(val -> {
                System.out.println(val);
                return ReactiveWrapperConverters.toWrapper(Uni.createFrom().item("abcd"), Mono.class);
              });
        }).contextWrite(Context.of("KEY", Mono.just("VALUE")))

        .subscribe(System.out::println, t -> ((Throwable) t).printStackTrace());

Execute it will print to console:

VALUE
abcd

But with this code:

    Mono.just("a")
        .flatMap(a -> {
          return Mono.deferContextual(c -> c.get("KEY"))
              .flatMap(val -> {
                System.out.println(val);
                return ReactiveWrapperConverters.toWrapper(ReactiveWrapperConverters.toWrapper(Mono.deferContextual(c -> c.get("KEY")), Uni.class).flatMap(v -> {
                  System.out.println(v);
                  return Uni.createFrom().item("abcd");
                }), Mono.class);
              });
        }).contextWrite(Context.of("KEY", Mono.just("VALUE")))

        .subscribe(System.out::println, t -> ((Throwable) t).printStackTrace());

Will print:

val
java.util.NoSuchElementException: Context is empty
    at reactor.util.context.Context0.get(Context0.java:43)
    at com.example.demo.DemoApplication.lambda$main$1(DemoApplication.java:58)
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at org.reactivestreams.FlowAdapters$FlowPublisherFromReactive.subscribe(FlowAdapters.java:366)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher$PublisherSubscriber.forward(UniCreateFromPublisher.java:41)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher.subscribe(UniCreateFromPublisher.java:26)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:73)
    at org.reactivestreams.FlowAdapters$ReactiveToFlowSubscription.request(FlowAdapters.java:182)
    at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)
    at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
    at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onSubscribe(FlowAdapters.java:206)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:25)
    at org.reactivestreams.FlowAdapters$ReactivePublisherFromFlow.subscribe(FlowAdapters.java:348)
    at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
    at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4496)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4478)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4414)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4386)
    at com.example.demo.DemoApplication.main(DemoApplication.java:65)

The second Mono.deferContextual(c -> c.get("KEY")) (inside ReactiveWrapperConverters.toWrapper) is not working.

This is what ReactiveWrapperConverters.toWrapper (from spring-data-commons) do:

        @Override
    @SuppressWarnings({ "ConstantConditions", "unchecked" })
    public <T> Converter<Object, T> getConverter(Class<T> targetType) {
        return source -> {

            Publisher<?> publisher = source instanceof Publisher ? (Publisher<?>) source
                    : RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(Publisher.class, source).toPublisher(source);

            ReactiveAdapter adapter = RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(targetType);

            return (T) adapter.fromPublisher(publisher);
        };
    }
simonbasle commented 4 months ago

This is expected, as Reactor Context cannot be propagated through other flavours of Publisher.

In fact, "propagating" is not the best word to describe how context is accessed: in a reactive stream chain, each operator's subscriber has access to the next subscriber in line (to pass data down the chain), and in the case of Reactor-to-Reactor chains it means that an operator can also call CoreSubscriber#currentContext(). If in the middle of the chain there is a Subscriber that is not a Reactor's CoreSubscriber this prevents access to Context defined further down the line.