reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.94k stars 1.2k forks source link

Improve Flux.timeout efficiency #2845

Open bruto1 opened 2 years ago

bruto1 commented 2 years ago

Current implementation uses parallel scheduler by default which is in essence a ScheduledThreadPoolExecutor which has the task queue guarded by ReentrantLock, which means two trips through said lock per signal on average

After looking at Netty's https://github.com/netty/netty/blob/4.1/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java there appears to be a way to do fewer timeout task reschedules if you let go of the idea illustrated by the current marble diagrams (new signal cancels the previous timeout task)

WDYT? this really prevents Flux.timeout from being useful with high-frequency publishers which may occasionally stall

suggestion inspired by https://gitter.im/reactor/reactor?at=61968a44197fa95a1c7adf05

simonbasle commented 2 years ago

In Netty, it looks like IdleStateHandler still uses some form of scheduling with a configurable EventExecutor... Not sure how this would translate in reactor-core and how different it would be.

I was going to suggest using a different Scheduler like Schedulers.newSingle() for that sort of timeouts, but it also relies on ScheduledThreadPoolExecutor... Which makes sense given that the only JDK implementation of ScheduledExecutorService is ScheduledThreadPoolExecutor 😓

One thing to keep in mind is that currently the underlying implementation of timeout(Duration) is actually timeout(Publisher) where the publisher is a Mono.delay. The Netty strategy might be an option but it would imply a separated implementation for timeout(Duration) (as opposed to more generic timeout(Publisher))...

So unless we get a stronger signal that this causes issues for a majority of users, I'm not keen on dedicating much bandwidth to changing timeout(Duration).

There's an element of team prioritization affecting that decision, so we could still accept a PR. That being said, the additional maintenance burden due to the split would warrant some benchmark quantifying the boost such a solution represents before we can definitely accept it.

simonbasle commented 2 years ago

Interesting side note: RxJava seems to have a close approach where timeout tasks also always trigger, but the id of the "real" timeout task is captured so that an outdated timeout will be no-op (failing the CAS on their index). that is another potential inspiration, with the same "split-out-timeout(Duration)-case-and-maintain" drawback.

bruto1 commented 2 years ago

a benchmark means actually implementing the new timeout and bombing both impls with signals, then comparing throughput?

simonbasle commented 2 years ago

yes, pretty much. I just want to set expectation here in case somebody wants to contribute: it can still get rejected if the benchmark doesn't show enough improvement.

unoexperto commented 2 years ago

@simonbasle Hi Simon. I'm the original guy from the Gitter thread. Could you please take a look at my first take of the problem to determine if it deserves PR ? Here is what I use in production now:

https://gist.github.com/unoexperto/fe6725a9bf20ff04e0cba0fbbf8a7606

I realize performance benefits are very specific to nature of the application but in my case I have ~3X throughput improvement in my product. In synthetic tests of the pipeline original timeout() spends ~38% of time in Mono.subscribe().

simonbasle commented 2 years ago

thanks for providing that @unoexperto. I had a bit of trouble comparing that code to the FluxTimeout one because of Kotlin, but it seems very close.

I was surprised to see it still had generic Publisher<?> for the timeout trigger (so using a Mono.delay would still use the Schedulers.parallel() by default). It also only covers a subset of the timeout operator API (no generation of timeout triggers per onNext, for instance. So I don't think we can easily integrate that into the reactor codebase.

Overall, looking back at this and at the FluxTimeout code, I think my statement that we'd need to maintain a specific implementation for the time-based timeout was probably wrong. Indeed I get the feeling that the index variable that we monotonically increment in FluxTimeout is enough. We don't really need the timestamp aspect of System.nanoTime().

So the only modification we'd need after all would be that FluxTimeout.TimeoutMainSubscriber#setTimeout stops cancelling the old timeout mono.

The behavior could even be configurable, at least at the constructor level (which would facilitate benchmarking in order to get a clear picture of the throughput improvement vs the gc-pressure/pressure on the scheduler's task queue).

wdyt?

simonbasle commented 2 years ago

@unoexperto @bruto1 looks like I missed the fact that timeout isn't rescheduled in onNext but in doTimeout (which currently is imprecise, but could be improved). So with that approach we DO indeed need nanoTime and we DO need a separate time-only implementation :(

By just eliminating cancellation of old timeout triggers, we retain the generic single implementation. Question is: does that help with performance?

bruto1 commented 2 years ago

it should time-only impl would be able to schedule a more precise delay() upon completion of the last one, though

unoexperto commented 2 years ago

@simonbasle @bruto1

By just eliminating cancellation of old timeout triggers, we retain the generic single implementation. Question is: does that help with performance?

Profiling shows that it's not the cancellation that eats CPU but subscription to Mono in this line. That's why at the expense of precision I moved rescheduling to doTimeout when previous timeout is finalized.

Do you think it's possible to implement generic timeout() without exposing concept of time to users? Perhaps .timeout() is misleading name in the first place. Semantically it's more .cancellation(). Thus coarseTimeout should have only one version that accepts Duration.

simonbasle commented 2 years ago

@pderop is conducting some interesting experiments with HashedWheelTimers. we do see a measurable improvement so we might consider it. the only question is what to expose exactly, and I'm leaning towards making an internal implementation of wheel timer at first, to only be used by the timeout(Duration) operator (time-based, default Scheduler). we can always consider exposing it as a Schedulers type of thing later on.

pderop commented 2 years ago

@bruto1

Hi,

Out of curiosity, can you confirm if performance is better when using the timeout operator with a "single" scheduler (Schedulers.single()) instead of using default parallel scheduler ?

for example, instead of using defaut parallel scheduler like:

                timeout(Duration.ofMillis(100), Mono.just(-1))

then replace by:

                timeout(Duration.ofMillis(100), Mono.just(-1), Schedulers.single())

thanks.

bruto1 commented 2 years ago

I don't actually have a mini benchmark for this, @Pderop - noticed the effect while profiling the entire service at work Why would single work better, though, if single() uses the same ScheduledThreadPoolExecutor?

    /**
     * Instantiates the default {@link ScheduledExecutorService} for the SingleScheduler
     * ({@code Executors.newScheduledThreadPoolExecutor} with core and max pool size of 1).
     */
    @Override
    public ScheduledExecutorService get() {
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, this.factory);
        e.setRemoveOnCancelPolicy(true);
        e.setMaximumPoolSize(1);
        return e;
    }

but only one thread instead of several of parallel() to serve the same number of I/O threads - it should result in more contention for work queue's lock, if anything

simonbasle commented 2 years ago

@bruto1 @pderop observed in his benchmark there was more time spent in the implicit picking of a Worker in Schedulers.parallel(), which isn't the case with Schedulers.single(). The use of single() was thus raised as a potential easy way to improve the situation for the use case covered in this issue (fast-producing publisher with timeouts that are not actually triggered).

it will be interesting to see if contention counterbalances that. But if using single() helps, it means we can avoid introducing a dedicated implementation for time-based timeouts, which would be preferable to me.

bruto1 commented 2 years ago

@pderop can you please share the benchmark code? It's better than mine because I have none (I've gotten rid of most of the blank timeouts since I filed this issue) but the results quoted by @simonbasle are counterintuitive

pderop commented 2 years ago

Hi @bruto1 ,

sure, the sample project is here, I hope it will help to track this issue.

a701440 commented 1 year ago

Hello Guys,

Any update on this issue? We are using version 3.5.5. We have run into this when using large number of items with Mono timeout. Try the test code bellow. In local tests only 18,000 or so timeouts run in the 10 seconds delay at the bottom of the test. Inspire of the fact that all Mono's have 50ms timeout and all are created and subscribed to before the 10 seconds start.

The output I get is:

start count=100000 success=0, fail=18600

@Test
public void testManyTimeouts() throws InterruptedException {
    long count = 100000;
    System.out.println("start count=" + count);
    AtomicLong successCnt = new AtomicLong();
    AtomicLong errorCount = new AtomicLong();
    for (int i = 0; i < count; i++) {
        int val = i;
        Mono<Object> m = Mono.create(sink -> {
            try {
                Thread.sleep(100);
                sink.success(val);
            } catch (InterruptedException e) {
                sink.error(e);
            }
        }).timeout(Duration.ofMillis(50));
        m.subscribeOn(Schedulers.boundedElastic()).subscribe(v -> {
            successCnt.incrementAndGet();
        }, e -> {
            errorCount.incrementAndGet();
        });
    }
    Thread.sleep(10000);
    System.out.println("success=" + successCnt.get() + ", fail=" + errorCount.get());
}
bruto1 commented 7 months ago

hi @simonbasle Been away for a while but got back around to this issue after all (better late than never, right?)

so single scheduler works well as long as there's only 1 thread scheduling and cancelling tasks on it if there's more, contention for the same reentrantlock predictably goes up: https://github.com/bruto1/test-reactor-flux-timer-benchmark

so maybe a new impl would be a good idea after all