reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.92k stars 1.2k forks source link

FluxGroupBy silently drops onNext signals #2352

Open Sage-Pierce opened 4 years ago

Sage-Pierce commented 4 years ago

I have a use case where I'm using Flux::groupBy with a relatively high cardinality of grouping (1024-4096 concurrent groups at any given time). FWIW, the use case itself is similar to a sort of deduplication functionality on an infinite stream.

Each "group" is limited in time (.take(Duration)) and size (.take(Long)) and collected as a List. Under high load, it appears some items are being dropped.

Thus far, I think I can point to where the relevant pieces of code are:

Expected Behavior

Items are not dropped by groupBy operator

Actual Behavior

Items emitted from upstream of groupBy operator are not emitted downstream, and are silently being dropped.

Steps to Reproduce

I wrote the following test to show a simplified version of what's going on with my use case:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(Schedulers.elastic())
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L))
                .take(2)
                .collectList(), 16384)
            .doOnNext(batch -> {
                try {
                    Thread.sleep(1L);
                } catch (Exception e) {

                }
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

Interestingly, the test sometimes passes without the doOnNext with Thread.sleep(1L). I had to add that to consistently get it to fail (and mimic small real-world processing overhead of each downstream item)

Possible Solution

I'm not sure. Still trying to figure out exactly what the problem is in groupBy

Your Environment

Sage-Pierce commented 4 years ago

I think I've isolated this particular unexpected behavior to being the result of a race condition in FluxGroupBy between draining and cancellation on UnicastGroupedFlux.

I think I've figured a workaround that both mitigates my issue and verifies the race condition. If I isolate both publish and subscribe operations on FluxGroupBy with the same Worker (Thread) that's responsible for the take(Duration) scheduling (on the GroupedFlux instances), the problem goes away:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.single(Schedulers.elastic());

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .subscribeOn(scheduler, true)
            .doOnNext(batch -> {
                try {
                    Thread.sleep(1L);
                } catch (Exception e) {

                }
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

I suppose my question to Maintainers is "what should be the general expected behavior in the absence of this Scheduler isolation?"

At the very least, silent dropping of onNext items seems like a bug.

I think it may be debatable whether or not item dropping should be possible in this situation. If so, such behavior may be a surprise to clients, especially without decent knowledge of how publish and subscribe operation threading may cause this condition

bsideup commented 3 years ago

@Sage-Pierce you seem to be blocking the non-blocking threads. Maybe you wanted publishOn, not subscribeOn before doOnNext?

bsideup commented 3 years ago

when I remove Thread.sleep from doOnNext everything seems to be fine. Since doing Thread.sleep there is incorrect, @Sage-Pierce please provide a reproducer that is not relying on it.

bsideup commented 3 years ago

FWIW, the use case itself is similar to a sort of deduplication functionality on an infinite stream.

For deduplicating an infinite stream, I'd recommend something like this (in case Flux#distinct does not work for you):

Flux.<Integer>generate(sink -> sink.next(ThreadLocalRandom.current().nextInt(0, 100)))
    .log("producer")
    .transformDeferred(flux -> {
        Set<Integer> items = new HashSet<>();
        return flux.filter(items::add);
    })
    .log("next")
    .take(100)
    .blockLast();

Note that the Set of items may produce OOM as well due to it being infinite. Consider using some Cache instead (https://github.com/ben-manes/caffeine or Guava) that supports TTLs / eviction.

Sage-Pierce commented 3 years ago

Thank you for the comments and suggestions, @bsideup!

you seem to be blocking the non-blocking threads. Maybe you wanted publishOn, not subscribeOn before doOnNext?

I'll update the failure scenario code (below) to use blocking threads

when I remove Thread.sleep from doOnNext everything seems to be fine

Although this is indeed blocking, the sleep call is present to mimic a small (1ms) overhead of processing downstream items. In actual usage, the process does some non-trivial computationally-bounded work that, IMO, isn't relevant to the issue that I think is being exposed. However, I believe the overhead itself is relevant due to increasing the likelihood of racing between publish and subscribe methods of FluxGroupBy (when not delegated to same Worker/Thread).

Here's an example without sleep, but with analogous processing overhead. Note that the test fails or succeeds based on whichever Scheduler configuration is used:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Fails
        // Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .subscribeOn(scheduler, true)
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());

For deduplicating an infinite stream, I'd recommend something like this ... using some Cache instead (https://github.com/ben-manes/caffeine or Guava) that supports TTLs / eviction

Using caches with TTLs and eviction is an interesting path to consider. However, there are a few problems with substituting it for the deduplication functionality I'm accomplishing with Reactor. For one, there would be less memory efficiency due to keeping items in the caches longer than they actually need to be. For another, the behavior is slightly different; In my case, when there are items/entities with duplicate IDs in the same window, I want to emit the latest item once the window expires. The provided Set-based substitute would emit the first/earliest item. There might be a way to implement emission of the latest items if Caffeine or Guava provides something like onEviction or onExpire callbacks, but I'm partial to the more-readable style that the Reactor code provides (and one-less library dependency πŸ˜„ ).

bsideup commented 3 years ago

@Sage-Pierce I still see .subscribeOn(scheduler, true) in the code - is it intentional? The problem is that you're blocking the parallel scheduler and it may be that the delays you're using are not getting executed hence the lost items

Sage-Pierce commented 3 years ago

I still see .subscribeOn(scheduler, true) in the code - is it intentional? The problem is that you're blocking the parallel scheduler

Yes, the subscribeOn is intentional such that groupBy subscription methods are executed with the same Worker that executes upstream publishing and GroupedFlux take cancelling. TBH, I mainly have it there since my real-world usage has further asynchronous boundaries (publishOn) downstream of subscribeOn. That being said, I can remove the subscribeOn altogether and still get same behavior, so it can be removed if it makes debugging easier.

I could certainly be wrong, but I don't think the latest code should be blocking on the parallel Scheduler anymore, due to explicitly specifying non-parallel Scheduler πŸ€”

Sage-Pierce commented 3 years ago

FWIW, code without subscribeOn:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Fails
        //Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }
bsideup commented 3 years ago

@Sage-Pierce this code still blocks the thread (by running the heavy computations). Consider using publishOn before doOnNext

Sage-Pierce commented 3 years ago

@bsideup I'll go ahead and add the publishOn, however I'm not certain I understand the value it provides as far as diagnosing the issue πŸ€” I feel like it would at best seem to mask it by working around whatever/wherever the problem is. However, I could again be missing something.

In any case, with the publishOn, I get more interesting behavior. The test either hangs or succeeds with following code:

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Hangs
//        Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .publishOn(scheduler)
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

And either hangs or flaky hang-fails with following code (note difference in downstream publishOn Scheduler):

    @Test
    public void test() throws Exception {
        // Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Scheduler scheduler = Schedulers.elastic(); // Hangs
//        Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Flaky hang-fails

        Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
            .flatMap(number -> Flux.concat(
                Mono.just(number),
                Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
                4096)
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
                .take(2)
                .collectList(), 16384)
            .publishOn(Schedulers.newParallel("test"))
            .doOnNext(batch -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
    }

Interestingly, the hanging behavior seems similar to issue discussed in #2138

bsideup commented 3 years ago

Although I haven't fixed the issue yet, I think I got a much simpler reproducer:

final int total = 100;

Long count = Flux.range(0, total)
                 .groupBy(i -> (i / 2) * 2, 42)
                 .flatMap(it -> it.take(1), 2)
                 .publishOn(Schedulers.parallel(), 2)
                 .count()
                 .block(Duration.ofSeconds(60));
assertThat(total - count).as("count").isZero();

fails with:

Expected :0
Actual   :44
simonbasle commented 2 years ago

@Sage-Pierce since I see that you've reworked your code, and since we've made other changes to groupBy in the past year, can this issue be closed?

Sage-Pierce commented 2 years ago

@simonbasle I'm not sure I can say this isn't still an issue. Even with Reactor 3.4.12 I can still observe silently dropped emissions with the following simplified example:

    @Test
    public void upstreamEmissionsShouldMatchDownstream() throws Exception {
        Hooks.onNextDroppedFail();
        int numGroups = 8;

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CompletableFuture<Void> future = new CompletableFuture<>();

        Flux.generate(sink -> sink.next(UUID.randomUUID()))
            .take(Duration.ofSeconds(30L))
            .doOnNext(next -> upstream.incrementAndGet())
            .groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L)), numGroups)
            .subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));

        future.get();
        System.out.println("Emitted: " + upstream.get());
        assertEquals(upstream.get(), downstream.get());
    }

I believe the problem is the time-based take on the "groups" from groupBy. What this appears to cause is a race condition between upstream item emissions and group cancellation (which happens on a different thread than the upstream emissions). If the .take(Duration.ofSeconds(1L)) is replaced with something non-timer-oriented, like .take(2), the test works fine.

To your point, though, I have found a workaround for this by ensuring that upstream publishing and group cancellation happens on the same thread by using a single-Worker Scheduler. The following modified example causes the test to pass:

    @Test
    public void upstreamEmissionsShouldMatchDownstream() throws Exception {
        Hooks.onNextDroppedFail();
        int numGroups = 8;
        Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic());

        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CompletableFuture<Void> future = new CompletableFuture<>();

        Flux.generate(sink -> sink.next(UUID.randomUUID()))
            .take(Duration.ofSeconds(30L))
            .publishOn(scheduler)
            .doOnNext(next -> upstream.incrementAndGet())
            .groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
            .flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler), numGroups)
            .subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));

        future.get();
        System.out.println("Emitted: " + upstream.get());
        assertEquals(upstream.get(), downstream.get());
   }

I still suspect users would be scratching their heads over the case where items are dropped. At the very least, I would expect the dropped onNext signals to result in an error due to the onNextDroppedFail hook. Ideally, however, I think it would be preferable that items are not dropped at all.

chemicL commented 1 month ago

This issue has some apparent history and various references. Is it worth looking into in 2024 @Sage-Pierce ? There is a current attempt (#3872) to document some halting issues with groupBy, probably unrelated. But I wonder what we should do about this one.

chemicL commented 3 weeks ago

@Sage-Pierce please reopen if required. I'm closing this as it's quite dated.

Sage-Pierce commented 3 weeks ago

Heya @chemicL, sorry I meant to get back to you on this last week. IMO, this is still a demonstrable bug in 2024. It would be great if a solution could be found for it, but it remains beyond me to figure out what that solution is.

Given that there is a workaround, and given lack of progress on this, it's clearly not an urgent bug. However, if anyone else comes along and wants to take a crack at it, I wonder if the issue should remain open for them

chemicL commented 3 weeks ago

Thanks. Reopening.