helidon-io / helidon

Java libraries for writing microservices
https://helidon.io
Apache License 2.0
3.51k stars 566 forks source link

Consistent guidance for reactive streams backpressure #5469

Closed barchetta closed 1 year ago

barchetta commented 1 year ago

We need to align on a consistent guidance to reactive streams back pressure. I've heard different takes from different people. For example:

dbClient.execute(exec -> exec.createQuery(sql).execute()).forEach(dbRow -> <doSomething>);

Let's assume the result set is huge. How should this ensure there is back pressure? My understanding (which could be wrong) is two possible scenarios:

  1. doSomething runs its business logic synchronously. This results in back pressure because of the implementation of JdbcStatementQuery.RowPublisher which loops over the result set calling onNext() on each row before getting the next.
  2. doSomething runs asynchronously, dumping its work on a thread pool or a queue as fast as possible. This results in little to no back pressure and can lead to resource exhaustion.

So, which is our guidance? Do 1 and get back pressure "for free"? Or do 2 and provide back pressure exactly how? The second option requires that the user control the rate and magnitude of Subscription.request(), but that is often buried in the terminal operation -- where they don't have control. Or if they want to control back pressure, must the terminal operation be subscribe() where they do have control?

This also begs the question of do we want to provide a more explicit feature for throttling? Somehow giving the user explicit control over throttling for terminal operations.

Reference:

  1. https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Subscription.html
  2. https://stackoverflow.com/questions/59555464/how-does-subscription-requestn-in-flow-api-perform-backpressure-at-any-n-value: "Generally, there is no reason to call request in onNext an end-Subscriber because it is run synchronously with respect to its immediate upstream Publisher and there is no practical difference between a single request(Long.MAX_VALUE) and repeated request(1). One of the few reasons to call request if the onNext forks off asynchronous work itself and only at the end of that work should more items be requested:"
tomas-langer commented 1 year ago

The answer to this question depends heavily on what <doSomething> is. I will try to answer for each option I can see:

  1. doSomething is non-blocking and does not add anything to memory (such as increasing a counter)
  2. doSomething is blocking
  3. doSomething is non-blocking, but does add stuff to memory temporarily (such as writing to a queue, invoking a remote service)

My answers:

  1. use forEach
  2. this is not allowed, you must replace this with a non-blocking implementation, such as executing it on a separate executor service, once you do this, it merges with case 3.
  3. this can be achieved through a flat map, where you map the items to a CompletionStage, Single or Multi created by the asynchronous/non-blocking task (@danielkec please provide your example)
danielkec commented 1 year ago
                .flatMap(i -> Single.create(CompletableFuture.runAsync(() -> {
                    //... something long and blocking
                }), true), 1, false, 1)
                .ignoreElements();
barchetta commented 1 year ago

JavaDoc for above flatMap: https://helidon.io/docs/v3/apidocs/io.helidon.common.reactive/io/helidon/common/reactive/Multi.html#flatMap(java.util.function.Function,long,boolean,long)