reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.19k forks source link

No emissions from other Flux in mergeWith().groupBy() #570

Closed westonlast closed 7 years ago

westonlast commented 7 years ago
Flux<Type> first = Flux.generate(Consumer);
Flux<Type> second = Flux.create(Consumer);

Flux<GroupedFlux<Key, Type>> groupedFluxFlux = first.mergeWith(second).groupBy(type -> {
    //emissions from "second" never arrive
});

groupedFluxFlux.subscribe(groupedFlux -> {
    //do something with group
    Mono<Type> mono = groupedFlux.take(2).reduce( ... );
    mono.block();
});

I understand that "second" is a "cold" publisher which needs a subscription to start emitting. Why doesn't downstream subscription cause an upstream subscription to both of the merged "first" and "second?"

Emissions from "second" occur if I subscribe() to it directly but of course that drains the Flux, which I don't want. I've tried turning "second" into a "hot" ConnectableFlux by calling publish(), subscribe(), and then connect(). Nothing works. What's wrong?

simonbasle commented 7 years ago

@westonlast any way you can reproduce with a Consumer implementation that you can share? ie reproducible self-contained test case?

I see several potential problems here:

groupFluxFlux.flatMap(group -> group.take(2).reduce(...))
             .subscribe(System.out::println); //for instance
akarnokd commented 7 years ago

You have to consume all the groups as consuming the groups triggers additional upstream requests. If one stalls (due to block()) and the upstream would like to emit into distinct groups including the blocked group, you get hangs.

westonlast commented 7 years ago

Thank you for your help.

I'll try flatMap().

"reduce() can only work with a full and finished sequence" - But I'm forcing the sequence that reduce() sees to finish with take(2), no? Shouldn't reduce() then apply as soon as the first two elements are available? And no, neither source Flux ever complete(), just error().

"You have to consume all the groups as consuming the groups triggers additional upstream requests" - It sounds like there is a missing piece of documentation, then: groupBy(), Reference Guide, or both. Something should specify requirements (restrictions) for a subscribe()/GroupedFlux subscription. What's the difference between a block() and a long execution? Hopefully by using flatMap(), I can avoid blocking, which appears to be better (except I was using the blocking to set a timeout...). Trying it out...

simonbasle commented 7 years ago

@westonlast that is indeed correct, take(2).reduce() should be fine. flatMap is definitely better and avoids blocking. You can always fine tune timeouts with .timeout(), and/or wait for the whole flatmap to finish inside blocking code by calling blockLast() instead of subscribe().

simonbasle commented 7 years ago

About the necessity of consuming all groups in a non-blocking fashion, not doing so causes a deadlock: GroupedFlux dispatches the first value into group A, at which point your subscriber says "ok, I'll block until I get a second value and can reduce the two". What @akarnokd says is that it effectively blocks the GroupedFlux, which is then unable to dispatch further values, so your subscriber never gets its second value and everything hangs.

westonlast commented 7 years ago

Without knowing the implementation details, I was assuming that the subscribe() callbacks would occur (logically, maybe not physically) in parallel as soon as possible when new GroupedFlux arrived. Your description makes sense, it's just that I'm not sure it's possible to know as a user given the current documentation. But I guess since Reactor is concurrency-agnostic, the base case is single-threaded, and a thread can't interrupt itself. Wouldn't hurt to make subscribe() requirements exceedingly clear, though.

simonbasle commented 7 years ago

It is not only true of subscribe but any Reactor or even RxJava operator that takes a lambda expression. In reactive programming, blocking is evil squared 👿 Since everything is tailored to be asynchronous, you're potentially not only blocking that particular lambda or callback but the whole reactive chain.

I could probably add an entry in the reference guide, eg. in intro to reactive programming section, to warn about that.

westonlast commented 7 years ago

What if my callback takes a (relatively) long time? How long can it take before data is lost?

I guess the limitation is the buffers, so memory. Never mind.

simonbasle commented 7 years ago

When you have no other way around it, you can exceptionally deal with blocking code by isolating it in a separate execution context by using .publishOn(Schedulers.elastic()) just before the operator that will execute a blocking callback. This will use a thread from a pool for each subscriber. Note the rest of the chain will continue execution on said thread, unless you change execution context again with publishOn.

westonlast commented 7 years ago

No luck. I can't get the "second" emissions to arrive.

Flux<Type> first = Flux.generate(Consumer);
Flux<Type> second = Flux.create(Consumer);

Flux<GroupedFlux<Key, Type>> groupedFluxFlux = first.mergeWith(second).groupBy(type -> {
    //emissions from "second" never arrive, just "first"
});

groupedFluxFlux.flatMap(groupedFlux -> {
    return groupedFlux.take(2).reduce(...);
}).subscribe(type -> {
   System.out.println("someday...");
});

FluxSink.next() is being called inside "second." I've even thrown some publishOn(Schedulers.elastic()) on a few places. Any idea?

Another thing that's been bugging me which might be related: When exactly can I expect the Flux.create(Consumer) callback to be executed? It seems to happen as late as possible.

simonbasle commented 7 years ago

Any way for you to put a reproduction case together, with a simplified but fully defined generate and create (eg. with a simple type like String)? I'm not sure what happens in your generate and create, which could be key.

When you subscribe to the groupedFluxFlux.flatMap, flatmap request data to the groupedFlux that itself starts requesting data from the merge, so both sources should get requested at the same time. You can chain a .log("generate") and .log("create") to first and second respectively and that might give some insight.

westonlast commented 7 years ago
import reactor.core.publisher.Flux;

interface Callback{
    void method();
}

public class Emitter{
    public static void main(String[] args){
        Flux<String> first = Flux.generate(synchronousSink -> {
            synchronousSink.next("1");
            sleep(2500);
        });

        Emitter emitter = new Emitter();
        Flux<String> second = Flux.create(fluxSink -> {
            emitter.start(() -> {
                fluxSink.next("2");
            });
        });

        first.mergeWith(second).groupBy(string -> {
            System.out.println(string);
            return 0;
        })
        .flatMap(groupedFlux -> {
            return groupedFlux.take(2).reduce((a, b) -> {
                return a;
            });
        })
        .subscribe(string -> {
            System.out.println("goal");
        });
    }

    public void start(Callback callback){
        new Thread(() -> {
            while(true){
                callback.method();
                sleep(3000);
            }
        })
        .start();
    }

    public static void sleep(long millis){
        try{
            Thread.sleep(millis);
        }
        catch(InterruptedException e){
            e.printStackTrace();
        }
    }
}

Thank you again for your help. Comment the two sleep() calls for the same result.

westonlast commented 7 years ago

After all of this time staring at this, I see a problem with my actual code, which is related to my question above, "When exactly can I expect the Flux.create(Consumer) callback to be executed?"

I need Emitter.start() to be executed before I begin processing "first." I need non-lazy, "eager" Consumer binding (which is what I assumed would happen when I initially started using the API).

westonlast commented 7 years ago

For some reason this works:

first = first.subscribeOn(Schedulers.single());

Somehow running the SynchronousSink ("first") on its own thread frees up the main thread to listen for callbacks from FluxSink ("second"). Huh?

As a user, I assumed (again! how dare I) that each Flux builder method call incrementally built up a data structure (including asynchronous buffer queues), and then subscribe() looped around this built-up data structure, filling synchronous queues ("first") and consuming asynchronous queues ("second"). It doesn't sound very "reactive," but it seemed like the obvious solution without requiring the user to specify Schedulers. How is Reactor "concurrency-agnostic" if, as a user, I have to be aware of creating enough threads for the Publishers, else no emissions?

simonbasle commented 7 years ago

As a user, you mainly have to be aware that mixing blocking behavior and non-blocking one raises a big red flag and can lead to worse locking than with imperative programming (since a lot of resources are shared by default).

Reactor being concurrent-agnostic means that the library doesn't make any strongly opinionated choices for you in terms of execution context, but rather gives you the tools to manage it yourself (so by definition you have to be a minimum aware of what you are doing, which is a tradeoff).

westonlast commented 7 years ago

"doesn't make any strongly opinionated choices" - Interesting, because that statement in itself is an opinion. :)

My point is I was aware of what I was doing (I'm oh so aware), but Reactor chose to be implemented differently. Why can't I run Reactor unconditionally from the main thread? It seems like Reactor has chosen not to support the simplest (simplest typically isn't optimal, but I don't care about optimizing... yet) case. Opportunity for better documentation, at least.

Can you speak to any of my open questions above? Lazy versus eager and the surprising subscribeOn(single) behavior in this example in detail (especially the "huh?" part)? Any thoughts about the complete code example above before my floundering comments?

simonbasle commented 7 years ago

trying to answer the "huh" part:

  1. each time you subscribe, a chain of Subscriber is built (about one per operator in your Flux chain + your final Subscriber built from the lambdas you provide). For cold sources this goes up to generating a new source, which generates a new set of data. For hot sources, it reattaches the chain to the source, letting it receive new events.
  2. by default, the whole pipeline is executed on the thread calling subscribe
  3. if you place a subscribeOn(s1) anywhere in your chain of operators, the whole pipeline executes on the thread defined by s1 instead (it is the same as above actually, it's just that this operator changes where the subscribe happens basically...)
  4. if you place a publishOn(s2) in your operator chain, upon reaching that part of the pipeline, the rest of the pipeline will execute on the thread defined by s2.
  5. This is also true for combining operators, that create a third Flux which internally subscribes to the sources it works with. For instance, groupBy does so in the current thread, and also attempts to dispatch incoming data in their correct group on the same thread...

Operators strive to reuse the same thread they received their input from.

So basically in your example, you are subscribing 3 different Flux in the main thread (generated source, create source and groupBy), then the generate source executes a sleep. That pauses the main thread. The problem is that groupBy is also blocked by this, and cannot further dispatch data. If it cannot dispatch data, it cannot gradually request more either, and everything grinds to an halt.

westonlast commented 7 years ago

Thank you, you've been extremely helpful.

In the example code, the sleep() calls are irrelevant, other than slowing it down and representing arbitrary work. You can remove them and the observed sequence is the same (ones only, no twos).

I thought Flux.create() would receive emissions unconditionally into a thread-safe queue, and the main thread would loop around, generate()-ing and then dequeue-ing. Or maybe dequeue-ing more first if the generate() couldn't keep up (the whole "backpressure" thing). "Wow, this queue is filling up faster than I can synchronously generate(). Let's dequeue a bunch before I generate again to "balance" the load (technically avoid OutOfMemoryException, maybe also signal Emitter to slow down)."

In the subscribeOn() scenario, if subscribe() flows back through the mergeWith() to the two upstream sources, why does "first" emit a single element before Emitter.start() is called? Why is the subscription to "second" lazy?

Why are these two uses of subscribeOn() different?:

first = first.subscribeOn(Schedulers.single());
...
//first.mergeWith().groupBy().flatMap()
...
flatMapped.subscribeOn(Schedulers.single());

Is it because subscribeOn()'s affect applies only within the context of a single Flux source, and groupBy() creates a distinct third source which contains the previous two, as you mentioned? So some operators, like groupBy(), are execution delimiters and containers ("thread boundaries")?

simonbasle commented 7 years ago

After looking a bit more into it, and how to explain the behavior, it all boils down to merge. I hope the following explanation gives you a clearer and deeper picture of what happens.

This simplified example stripped of groupBy still demonstrates the same behavior you were puzzled about:

@Test
    public void test570TwoFastPublishers() {
        LongAdder subscriptionCount = new LongAdder();
        AtomicLong firstCount = new AtomicLong();
        AtomicLong secondCount = new AtomicLong();

        Flux<Integer> first = Flux.just(1)
                                  .repeat()
                                  .doOnNext(v -> firstCount.incrementAndGet())
                                  .doOnSubscribe(s -> subscriptionCount.increment());

        Flux<Integer> second = Flux.just(2)
                                   .repeat()
                                   .doOnNext(v -> secondCount.incrementAndGet())
                                   .doOnSubscribe(s -> subscriptionCount.increment());

        Flux.merge(first, second)
            .take(5000)
            .blockLast();

        System.out.println("firstCount = " + firstCount.get() +
                ", secondCount = " + secondCount.get());

        Assert.assertEquals(2, subscriptionCount.intValue());
    }

(fails with only 1 subscription and outputs "firstCount = 5000, secondCount = 0")

That's because merge (and by extension flatMap) doesn't really aim at parallelizing arbitrary sources nor enforcing fairness.

It rather focuses on allowing asynchronous sources (that would already run on their dedicated execution context) to generate data in parallel, and on how to flatten values from these sources so that the onNext merge produces are not happening concurrently (as required by the reactive streams spec).

Another key element is that once one of merge sources has started, merge lets it emit data at its chosen pace and keeps requesting more data from it (draining the source).

By putting every such source on its own Scheduler, you truly parallelise the sources, and merge is back to its main job of flattening their values...

Our example above becomes:

    @Test
    public void test570TwoFastPublishersIsolated() {
        LongAdder subscriptionCount = new LongAdder();
        AtomicLong firstCount = new AtomicLong();
        AtomicLong secondCount = new AtomicLong();

        Flux<Integer> first = Flux.just(1)
                                  .repeat()
                                  .subscribeOn(Schedulers.elastic())
                                  .doOnNext(v -> firstCount.incrementAndGet())
                                  .doOnSubscribe(s -> subscriptionCount.increment());

        Flux<Integer> second = Flux.just(2)
                                   .repeat()
                                   .subscribeOn(Schedulers.elastic())
                                   .doOnNext(v -> secondCount.incrementAndGet())
                                   .doOnSubscribe(s -> subscriptionCount.increment());

        Flux.merge(first, second)
            .take(5000)
            .blockLast();

        System.out.println("firstCount = " + firstCount.get() +
                ", secondCount = " + secondCount.get());

        Assert.assertEquals(2, subscriptionCount.intValue());
    }

(this test passes and outputs eg. "firstCount = 2578, secondCount = 2444". notice the total exceeds 5000, that's due to prefetch behavior, but the part after the take still only receives 5000 onNext events)

generate will always act on whatever thread it gets requested from from downstream. create can have its own execution context since the data could be generated from an arbitrary thread (like your Emitter does), but if that's not the case it will be alike generate. So that's why you need to isolate subscription to your generate (it is infinite and has no dedicated execution model), and second should be fine (it is infinite but has the Emitter thread execution model).

Side note: It occurred to me that if what you are after is pairwise combination of values from 2 sources, there's actually a better way: zip. Unlike merge, it is tailored for this use case where it requests 1 value from every source to produce 1 zipped value, so it is more fair... But as far as I can tell, this is not really your goal.

westonlast commented 7 years ago

That makes the behavior much more clear. My expectation "all emissions will arrive eventually" is not guaranteed by Reactor; the user must consider each execution context for each emitter and provide enough threads.

My use is pairing asynchronous requests with each corresponding response. Requests always occur before the associated response, and responses might be resource failure. Responses interleave. The zip() operator only pairs the latest request and response, so mergeWith().groupBy() was the simplest solution I could find.

simonbasle commented 7 years ago

I see, grouping by a correlation key I assume... indeed sounds like a job for groupBy.

simonbasle commented 7 years ago

closing for now, I will try to better hint at the danger of an infinite source without publishOn in merge javadoc.