reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.98k stars 1.21k forks source link

Add a new side-effect operator for `Publisher#subscribe()` #1716

Closed simonbasle closed 5 years ago

simonbasle commented 5 years ago

doOnSubscribe method hooks into the signals of a Subscriber and Subscriber doesn't have a subscribe() method. The unfortunate thing is that Subscriber#onSubscribe and Publisher#subscribe method names are so close, so it makes Flux#doOnSubscribe a bit ambiguous.

The intent of doOnSubscribe was for cases when you want to look at the Subscription object, which sometimes comes deferred.

Unfortunately, this is also the closest method people find that seems to allow "logging the start of the sequence" / "logging the subscribe() call".

So maybe we need a new do???(Runnable) method that is more obviously dedicated to that, like doFinally is dedicated to covering all possible ends of a sequence.

Descriptive and differentiated naming is hard though... Will add a few potential names in subsequent comments people can vote on, don't hesitate to offer ideas too 😉

simonbasle commented 5 years ago

doFirst

diverges from the pattern of doOn*, but one could argue this is similarly to doFinally, and is a nice counterpart to doFinally, clear intent.

simonbasle commented 5 years ago

doAtStart same as above, captures the usual intent of users quite well. maybe easier to discover in autocompletion suggestions?

simonbasle commented 5 years ago

doOnPublisherSubscribe

closer to the reality of the implementation / underlying signals, conforms to the regular pattern, but a bit of a mouthful...

simonbasle commented 5 years ago

doBeforeSubscribe

captures the signal (Publisher#subscribe()), the intent and ordering vs Subscriber#onSubscribe, but a departure of our pattern of assuming a doOn is happening before its signal, unless otherwise specified (eg. doAfterTerminate)...

vy commented 5 years ago

@simonbasle, do you mean the following is incorrect:

return findAll(query)
        .doOnSubscribe(ignored -> LOGGER.info("started (query={})", query));

For hooking to the first emission, this is generally what I do:

Flux.defer(() -> {
    boolean[] first = new boolean[] { true };
    return findAll()
            .doOnNext(item -> {
                if (first[0]) {
                    first[0] = false;
                    LOGGER.info("first item: {}", item);
                }
            });
});

(Note that Mono.defer() above is necessary to avoid corrupted first state during re-subscriptions triggered by, say, retry()s.) Given this long shebang, I would really appreciate something like doOnFirst().

Additionally, what I particularly find missing in the previous comments is, what is the actual intended behavior for doFirst() you have in mind? You gave examples to existing shortcomings, but I could not get it clearly what do you want doFirst() to exactly do.

simonbasle commented 5 years ago

For more in-depth context

(END HERE: data starts flowing from Publisher1)
^
| Publisher1      Publisher2      Publisher3
| |::::::::^    :::::::::::^    :::::::::::^ 
| |        |    :          |    :          |
| |      p1.sub :        p2.sub :        p3.subscribe(s4)
s2.onSub(S1)    :          |    :          |
| v        |:::::          |:::::          |           
| subscriber2     subscriber3    subscriber4 <------(START HERE: op3.subscribe(subscriber4))
|    |             ^   |            ^ ___   
|    |s3.onSub(S2)_|   |s4.onSub(S3)|    |
|                                        |
S1.req <---------- S2.req <-------- S3.request(n)

Note that generally, in the diagram above, S3 (Subscription #3) and s3 (Subscriber #3) would be the same thing in Reactor's architecture.

:::: represents an execution path where a Publisher could do some preparatory work before calling subscribe on its source publisher.

Where does the existing doOnSubscribe fit?

It executes at the level of the sX+1.onSub(SX). For a given Subscriber, in reactor this usually translates to subscriberXPlusOne.onSubscribe(this) being called by subscriberX.

It is typically placed at the end, so in the diagram above as operator Publisher3: it would thus execute right before s4.onSub(S3).

Where does the proposed solution fit?

At the level of pX.sub (typically implemented as publisherX.subscribe(this) in subscriberX+1).

It would receive the subscribe(SubscriberX+1) call and execute its handler there, then subscribe to PublisherX-1.

Since the subscribe calls are made backwards, the closer to the end of the chain of operators it is, the greater the chances that it will truly execute before anything else.

An example

⚠️ Currently users are trying to use doOnSubscribe to log something at start of sequence, with some surprises:

Mono.just(5)
    .flatMapMany(i -> {
      log("prepare range for " + i);
      return Flux.range(i * 10, 3));
    })
    .doOnSubscribe(sub -> log("start"))
    .doOnTerminate(() -> log("end"))
    .blockLast();

This prints out:

prepare range for 5
start
end

✅ Instead, doOnSubscribe should be seen as "pipeline is ready for requests". So with the new operator it would become:

Mono.just(5)
    .flatMapMany(i -> {
      log("prepare range for " + i);
      return Flux.range(i * 10, 3));
    })
    .doOnSubscribe(sub -> log("subscription is ready for requests"))
    .doFirst(() -> log("start"))
    .doOnTerminate(() -> log("end"))
    .blockLast();

Which would print:

start
prepare range for 5
subscription is ready for requests
end

Not considering methods that are merely pass-through in these operators, the order of execution is roughly:

click for all steps

  1. doOnTerminatePublisher.subscribe(blockLastSubscriber)
  2. doFirstPublisher.subscribe(doOnTerminateSubscriber)
  3. log from doFirst
  4. doOnSubscribePublisher.subscribe(doFirstSubscriber)
  5. flatMapManyPublisher.subscribe(doOnSubscribeSubscriber)
  6. flatMap detects source is a scalar callable and avoid the subscribe-onSubscribe-request dance on Mono.just entirely, simply resolves 5
  7. log from flatMapMany (which also prepared a FluxRange publisher, not started yet)
  8. doOnSubscribeSubscriber.onSubscribe(flatMapManySubscription)
  9. log from doOnSubscribe
  10. doFirstSubscriber.onSubscribe(doOnSubscribeSubscriber) <-- this is both a Subscriber and Subscription
  11. doOnTerminateSubscriber.onSubscribe(doFirstSubscriber)
  12. ... all the way to blockLast, which starts requesting its Subscription, which pass the request through...
  13. ...flatMapMany starts pumping data from the Flux.range...
  14. ...data flows through, until onComplete flows through up to doOnTerminateSubscriber
  15. log from doOnTerminateSubscriber
  16. blockLast returns 52
smaldini commented 5 years ago

I also suggest doOnSubscriber for a name, however I wonder about the signature argument Runnable vs Consumer<Subscriber>.

I'm not super happy with the current doOnSubscribe providing access to Subscription which is mostly unusable per se. If anything those lifecycle methods should provide access to context manipulation or they should provide a safe Subscription ignoring requests (in effect providing a Disposable version).

simonbasle commented 5 years ago

I also suggest doOnSubscriber for a name, however I wonder about the signature argument Runnable vs Consumer<Subscriber>.

I don't really like doOnSubscriber, as it doesn't solve the ambiguity very much:

Remember, we're trying to offer an obvious path of least resistance to users who look for "a do* method that executes before anything else in the chain of operators".

rstoyanchev commented 5 years ago

Having doOnSubscribe and doOnSubscriber side by side might make them easy to mix up but then again they're so close, perhaps it's okay even if you pick randomly. Overall the similarity actually re-enforces the fact those are two very similar, yet slightly different points in the lifecycle, so on the whole I prefer doOnSubscriber as the best option. It also has the advantage of clearly communicating vs all others that require a bit of a mental leap and explanation.

simonbasle commented 5 years ago

@smaldini I think we can introduce a doFirst with a simplified Runnable signature in 3.2.10.RELEASE, as this low hanging fruit would benefit 3.2.x users. wdyt?

The consensus seems to be on doFirst, and we can always explore doOnSubscriber as a bit more advanced alternative in 3.3.