reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.99k stars 1.21k forks source link

MonoCacheInvalidateIf can cause an NPE on cancellations #3907

Closed nicholashagen closed 3 weeks ago

nicholashagen commented 1 month ago

The MonoCacheInvalidateIf operator (via cacheInvalidateIf) can cause an NPE when subscriptions occur after a cancellation. This can lead the class in a broken state potentially.

Expected Behavior

Cancellations should not cause an NPE and the flow should fail/cancel as expected.

Actual Behavior

An NPE is thrown until handled by an error operator.

Steps to Reproduce

    @Test
    public void testCacheInvalidateIf() throws Exception {

        AtomicReference<Throwable> error = new AtomicReference<>();

        // create mono that defers to some action
        // that flat maps into something that use a cache invalidation
        Mono<String> mono = Mono.defer(() -> {
            return Mono.just("foo");
        }).flatMap(x -> {
            // induce fake latency simulating a thread being paused
            // until a cancellation occurs
            try { Thread.sleep(200); } catch (Exception e) {}
            return Mono.fromSupplier(() -> {
                return "foobar";
            })
            .subscribeOn(Schedulers.boundedElastic())
            .cacheInvalidateIf(c -> true);
        })
        .subscribeOn(Schedulers.boundedElastic())
        .doOnNext(System.out::println)
        .doOnError(error::set);

        // subscribe, induce some latency (before the latency in the flat map) and then cancel
        Disposable d = mono.subscribe();
        Thread.sleep(100);
        d.dispose();

        // let threads complete
        Thread.sleep(500);

        // verify
        Assertions.assertNull(error.get());
    }

Possible Solution

In the remove function, this.upstream.cancel() is invoked. When a subscription occurs it calls onSubscribe which sets the UPSTREAM to the current subscription. However, if cancel is invoked, then remove is called and if the subscribe never happened in time, then it would be null. Also, I'm not sure if upstream field should be used directly versus doing a get on the UPSTREAM atomic instance.

Your Environment

Java 17, Mac OS, Junit 5

chemicL commented 3 weeks ago

Hello, @nicholashagen Thank you for the report and the reproducer! I am now looking into this issue and will update once I have some findings. At this point I can confirm this is a bug and needs addressing.

chemicL commented 3 weeks ago

@nicholashagen #3914 aims to resolve the issue. I adapted your reproducer to make sure it always triggers the problem and also added a validation that the upstream subscription does not happen in such case.