reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.98k stars 1.2k forks source link

Enabled Automatic Context Propagation and context propagation with lift causes ClassCastException #3762

Closed kz-dt closed 6 months ago

kz-dt commented 7 months ago

Expected Behavior

No exceptions are thrown

Actual Behavior

An exception is thrown:

java.lang.ClassCastException: class reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber cannot be cast to class reactor.core.Fuseable$QueueSubscription (reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber and reactor.core.Fuseable$QueueSubscription are in unnamed module of loader 'app')
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:264)
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
    at reactor.core.publisher.FluxRefCount$RefCountInner.setRefCountMonitor(FluxRefCount.java:209)
    at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:85)
    at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4478)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4414)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361)
    at org.example.ReactorClassCastExceptionReproducerSample.runSample(ReactorClassCastExceptionReproducerSample.java:44)
    at org.example.ReactorClassCastExceptionReproducerSample.main(ReactorClassCastExceptionReproducerSample.java:18)

Steps to Reproduce

Reproducer is here: https://github.com/kz-dt/reactor_class_cast_exception_reproducer

private <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
      return Operators.lift((a, b) -> b);
}
@Test
void reproCase() {
     Hooks.onEachOperator("testTracingLift", tracingLift());
     Hooks.enableAutomaticContextPropagation();

      Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
      Flux<String> flux = sink
          .asFlux()
          .doOnRequest(v -> System.out.println("OnDoRequest " + v))
          .doOnTerminate(() -> System.out.println("doOnTerminate"))
          .doOnCancel(() -> System.out.println("doOnCancel"))
          .publish()
          .refCount();

      Mono<List<String>> res = flux.map(s -> s + " mapped").collectList();
      res.subscribe(v -> System.out.println("Received a list of mapped strings: " + v));
}

Possible Solution

It looks like Operators.lift returns wrong subscriber type. Calling Hooks.onOperatorDebug(); "fixes" the issue since it adds another wrapper.

Your Environment

reactor-core 3.6.+ micrometer jar should be in classpath

The issue originally was discovered with SpringBoot 3.2+ and couchbase java client 3.4.11, the dependencies were the following: org.springframework.boot:spring-boot-starter-webflux:3.2.0 org.springframework.boot:spring-boot-starter-data-couchbase-reactive:3.2.0

chemicL commented 7 months ago

Thank you for the report. I can confirm this is a bug. I will investigate.

chemicL commented 7 months ago

As a matter of fact, Automatic Context Propagation is not the root cause here.

Flux#refCount combined with Operators.lift is problematic: For Subscriber of a Fuseable operator (FluxMapFuseable), it feeds a non-Fuseable Subscription. Namely, ContextWriteRestoringThreadLocalsSubscriber, which comes from lifting (via FluxLift) the FluxRefCount operator. The solution that FluxLiftFuseable has for this case – using FluxHide#SuppressFuseableSubscriber – does not help.

Here is a more simplified reproducer demonstrating the issue with no automatic context propagation.

    private <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
        return Operators.lift((a, b) -> new FluxMap.MapSubscriber<>(b, i -> i));
    }

    @Test
    void reproCase() {
        Hooks.onEachOperator("testTracingLift", tracingLift());

        Flux<Integer> flux = Flux.just(1);

        flux.map(s -> s).blockLast();

        Hooks.resetOnEachOperator();
    }

Let me try to work with that.

chemicL commented 6 months ago

I dug a bit deeper. Two things:

  1. The issue mentioned above is expected: wrapping a Fuseable Subscriber with a non-Fuseable one is prohibited. There is no way the internals in reactor-core could help.
  2. The actual problem is that the lift operation wraps FluxRefCount, which is Fuseable with a non-Fuseable FluxContextWriteRestoringThreadLocals that provides a non-Fuseable Subscriber, which also acts as a Subscription to the FluxMapFusable.MapFuseableConditionalSubscriber. That disconnect is undesired.

The reason for it is that FluxRefCount is not properly marked as an internal operator and is wrapped multiple times due to other issues with handling lifting. I will follow-up with a PR.

chemicL commented 6 months ago

One more issue is that FluxRefCount, being Fuseable, gets wrapped into a FluxLiftFuseable upon assembly, which upon creation takes the source Publisher, FluxRefCount and applies Flux.from on top of it. Flux.from wraps FluxRefCount with a non-Fuseable FluxContextWriteRestoringThreadLocals which delivers a non-QueueSubscription downstream which is the problem. Furthermore, Flux.from also applies assembly hooks again, which means that FluxContextWriteRestoringThreadLocals is wrapped with a FluxLift (as a non-Fuseable operator). This is a bit of a mess unfortunately. The PR is coming...

chemicL commented 6 months ago

I created a PR to address the multitude of problems that led to this report.

@kz-dt I have a question. The answers won't influence the fix that is needed with no doubt. However, I wonder why you combine automatic context propagation with Hooks.onEachOperator. Especially as the reproducer names the lift operation as tracingLift. The intentions behind automatic context propagation were to eliminate the use of onEachOperator and other Hooks for the means of tracing and correlation propagation across Thread boundaries. Combining these two approach can lead to errors as well as performance issues. If you don't mind, please elaborate a bit about your use case. Perhaps we can do something in terms of documentation, design or educational material if other users also have similar patterns.

kz-dt commented 6 months ago

Hi @chemicL,

Thank you for looking into the issue.

A customer uses both automatic context propagation and separate solution which injects java agent and setups Hooks.onEachOperator handler. I.e. Dynatrace OneAgent, OTel instrumentation or DataDog.

chemicL commented 6 months ago

@kz-dt thanks for sharing. I think I managed to resolve the possible issues. Please validate with the latest 3.6.6-SNAPSHOT.

kaliy commented 5 months ago

Hi @chemicL, do you have any ETA on the 3.6.6 release availability? Thanks

kz-dt commented 5 months ago

Hi @chemicL,

Snapshot is passing the test, thank you. I also tried with 3.6.6 release from maven central, it worked too.

@kaliy as far as I understand 3.6.6 was released on 2024.05.14 - https://github.com/reactor/reactor-core/releases (but I am not a part of Reactor team)

chemicL commented 5 months ago

Thanks for confirming, @kz-dt. Glad to hear it works.

@kaliy as @kz-dt noted, 3.6.6 has been released this week as part of the 2023.0.6 Reactor BOM.