reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.97k stars 1.2k forks source link

Add an operator to ensure running on a specific Scheduler to avoid overhead #3313

Closed mdindoffer closed 7 months ago

mdindoffer commented 1 year ago

I would very much like to have an operator that ensures elements are published on a given Scheduler, but does not force a context switch if not necessary and therefore avoids unnecessary overhead. I.e.: ensurePublishOn(Scheduler)

Motivation

Our application has organically grown to a big beast that uses Project Reactor in most of the layers (from UI down to the DB). This leads to many situations where a caller may invoke a method of another component that returns a Flux of elements and then further processes them. E.g.:

public Mono<Void> doSomething() {
    return otherComponent.produceAFluxOfStuff()
            .doOnNext()
            .handle()
            .reduce()
            .then();
}

Since the otherComponent#produceAFluxOfStuff does not explicitly state what Scheduler is used to produce the Stuff, the method #doSomething() does not know either. If the operators in #doSomething() are doing heavy IO, one should for example choose the BoundedElastic scheduler, to make sure we don't block a thread that should not be blocked (for example threads from Parallel Scheduler). So usually we end up with something like this:

public Mono<Void> doSomething() {
    return otherComponent.produceAFluxOfStuff()
            .publishOn(Schedulers.boundedElastic())
            .doOnNext()
            .handle()
            .reduce()
            .then();
}

Now, on surface this looks fine - it's defensive in design and works correctly, even if the implementation of #produceAFluxOfStuff() is changed later. However, it's inefficient.

The publishOn(Scheduler) operator always forces a context switch - from one thread to another. So even if our Stuff elements in the Flux were already emitted on a thread from BoundedElastic, we would still force switching a context to another thread. A concrete example:

@Test
void testReactor() {
    Flux.just(1,2,3,4,5)
            .publishOn(Schedulers.boundedElastic())
            .doOnNext(integer -> LOG.warn("Processing a number: {}", integer))
            .publishOn(Schedulers.boundedElastic())
            .doOnNext(integer -> LOG.warn("Still processing a number: {}", integer))
            .blockLast();
}

Outputs (notice the use of both boundedElastic-2 and boundedElastic-1):

08-12-2022 17:13:10.389 [boundedElastic-2] WARN  c.a.d.a.DummyTest - Processing a number: 1
08-12-2022 17:13:10.389 [boundedElastic-2] WARN  c.a.d.a.DummyTest - Processing a number: 2
08-12-2022 17:13:10.390 [boundedElastic-2] WARN  c.a.d.a.DummyTest - Processing a number: 3
08-12-2022 17:13:10.390 [boundedElastic-1] WARN  c.a.d.a.DummyTest - Still processing a number: 1
08-12-2022 17:13:10.390 [boundedElastic-2] WARN  c.a.d.a.DummyTest - Processing a number: 4
08-12-2022 17:13:10.390 [boundedElastic-1] WARN  c.a.d.a.DummyTest - Still processing a number: 2
08-12-2022 17:13:10.390 [boundedElastic-1] WARN  c.a.d.a.DummyTest - Still processing a number: 3
08-12-2022 17:13:10.390 [boundedElastic-2] WARN  c.a.d.a.DummyTest - Processing a number: 5
08-12-2022 17:13:10.390 [boundedElastic-1] WARN  c.a.d.a.DummyTest - Still processing a number: 4
08-12-2022 17:13:10.390 [boundedElastic-1] WARN  c.a.d.a.DummyTest - Still processing a number: 5

A context switch to a different thread is an expensive operation and something that could be conditionally avoided.

Desired solution

To solve the requirement of ensuring we're running on a given Scheduler without doing unnecessary switching I propose adding a new (set of) operator(s), like Flux#ensurePublishOn(Scheduler). This operator would act just like its old publishOn(Scheduler) counterpart, with one difference - it would be NOOP if the execution context already belongs to the supplied Scheduler.

public Mono<Void> doSomething() {
    return otherComponent.produceAFluxOfStuff()
            .ensurePublishOn(Schedulers.boundedElastic()) // No thread switching, yay!
            .doOnNext()
            .handle()
            .reduce()
            .then();
}

Considered alternatives

The naming is up for discussion. Maybe assertPublishOn is better? Naming aside, I cannot think of any simple alternative to this.

Additional context

Since many Scheduler instances of a given type may exist, it would be best if the condition logic deciding to switch or not would consider the type instead of identity. But, if this proves to be too difficult to do, I would be satisfied with comparison on the instance level.

Also, the subscribeOn(Scheduler) could get its own ensure counterpart to have feature parity.

chemicL commented 1 year ago

Thank you for the detailed report and clearly stating expectations. We had a discussion within the team about this suggestion. At first glance, it seems like a nice feature to have, however there are some considerations worth having before deciding to implement/use such a feature at all.

First question to ask is - is it to improve performance? If so, it goes without saying that introducing additional logic to infer and alter the behaviour can on its own be a performance hit. We have to consider both assembly time, but also subscription/runtime. If performance is of concern, the entire pipeline most probably would require coordinating and insight into what otherComponent#produceAFluxOfStuff is doing would be necessary.

Nevertheless, let's consider for a moment that the approach you are considering taking is necessary:

Since the otherComponent#produceAFluxOfStuff does not explicitly state what Scheduler is used to produce the Stuff

This can happen when produceAFluxOfStuff comes from an external dependency, or you decide so in your project that the domain of produceAFluxOfStuff is outside of your control or influence.

One approach in the latter case is to introduce some clear contracts as to where the values will be published to avoid the overhead of adding publishOn. The former case - libraries and actually operators themselves usually provide enough information to understand where the "out-of-band" (not the regular flow from the subscriber's demand) publishing can happen, either using Schedulers for time based operations (e.g. delayElements) or the Netty EventLoop in case of WebClient.

As you might not have the insight into the transformations the called function is doing, it might be difficult to actually do any sort of analysis in the reactor codebase to infer where values can be published. In terms of assembly time, which could yield an actual performance benefit, we could potentially perform scanning of the pipeline and traverse from downstream to upstream: if we notice a publishOn operator, we could report what Scheduler is in use. That sounds appealing at first. But we would also need to check for asynchronous boundary related operators. E.g. the ones with queues, where consumption from the internal queue can happen on any Thread - bear in mind that Subscription::request can come from anywhere and the values can get delivered from that "anywhere" as long as the reactive streams rules are preserved (here - the rules about delivering signals serially). Even if we had that covered, there are custom operators out there, it would not be possible to infer their behaviour.

Doing runtime assertion would be more difficult and would essentially require what publishOn is already doing - using an internal queue, but potentially reusing the publishing Scheduler instead of a supplied one. The reason for that is again, reactive-stream spec. Let's consider where elements E1, E2 are delivered from produceAFluxOfStuff - E1 comes from an external DelayItemsScheduler and E2 comes from the desired, MyScheduler. If E1 was re-scheduled to MyScheduler, but E2 was immediately delivered, the order could be altered and E2 could arrive before E1. To preserve the order in which E1 and E2 arrive, we would need to enqueue E1 first and E2 afterwards before they are delivered.

We are currently not planning to implement anything outside of what is already available, but we will keep this issue open. There might be smart ways to address the problem, but it will require some research. Things might be easier at the root, e.g. when emitting items to a source Publisher and the ideas from that exploration could prove to be useful in this type of optimization. However, for now, this issue will be on the back burner.

Please do provide more insight into your particular case if I misunderstood the intention in any way. But for the general case, a contract with the publisher of data in the produceAFluxOfStuff component might be necessary that specifies on what Threads the publishing can happen on.

chemicL commented 1 year ago

Note that the assembly time "workaround" with diagnosing types of operators in the reactive chain can be error-prone at the moment - while discussing this subject we also discovered some operators can report incorrect runtime characteristics: #3318.

mdindoffer commented 1 year ago

Thanks for the thorough explanation. Yes, the whole point of this is to reduce overhead / increase performance, which is lost when trying to write reactive code in a defensive/foolproof style.

I admit I didn't think about all the nitty gritty details when writing this. I completely neglected custom operators, which I agree are complicating the assembly-time scanning.

One approach in the latter case is to introduce some clear contracts as to where the values will be published to avoid the overhead of adding publishOn

100% agree. However, there is no API or external tooling that can enforce this contract. One can only add javadocs where possible, and those don't guarantee anything. There's no "type safety" so to speak. I'd like to have some confidence in the code without relying on documentation that may be easily overlooked.

Doing runtime assertion would be more difficult and would essentially require what publishOn is already doing - using an internal queue, but potentially reusing the publishing Scheduler instead of a supplied one. The reason for that is again, reactive-stream spec. Let's consider where elements E1, E2 are delivered from produceAFluxOfStuff - E1 comes from an external DelayItemsScheduler and E2 comes from the desired, MyScheduler. If E1 was re-scheduled to MyScheduler, but E2 was immediately delivered, the order could be altered and E2 could arrive before E1. To preserve the order in which E1 and E2 arrive, we would need to enqueue E1 first and E2 afterwards before they are delivered.

Good point, the performance benefit might be entirely dismissed by the internal queueing. But, if further processing on a desired MyScheduler is required, one would have to use publishOn anyway, which would incur the same or even worse overhead, depending on runtime circumstances.

On that topic - perhaps it would be possible to at least add an operator that tries to asserts this contract at runtime? I.e. assertPublishedOn(Scheduler), which would only query (not switch!) the thread at runtime, and if it doesn't belong to the required Scheduler, it would throw an error. While this would not perform any smart conditional thread switching, it would avoid the problematic internal queueing altogether. Employing this runtime assert one could test their code and get rid of any unnecessary publishOn operators. This would be, at last, an implementation of the threading contract you mentioned earlier. Violations of the threading contract would be discovered at runtime and then remediated by a code change (usually on the producer's side of things).

mdindoffer commented 1 year ago

A shower thought of today, if I may: the error-throwing runtime variant of assertPublishedOn could be useful for reactor-test / StepVerifier as well. One could test in unit/integration tests whether a correct Scheduler is emitting elements from a given method. Although I know even less about StepVerifier than I do about core, so I have no idea on how copy-pastable that implementation would be.

chemicL commented 7 months ago

@mdindoffer taking all of the above into consideration and lack of further interest of the community in this, I think it is time to close this proposal. I agree a test utility could be added to at least validate the contract, but due to low priority, I'd only offer to review and accept if a contribution was put forward. Thanks and feel free to open a PR.