vert-x3 / vertx-rx

Reactive Extensions for Vert.x
Apache License 2.0
145 stars 72 forks source link

Flowable timeout operator error when converted to Single #294

Closed mer2github closed 1 year ago

mer2github commented 1 year ago

When I use the Flowable timeout operator in an Rx chain that converts to a Single, I see spurious timeout related exceptions

Version

4.4.1

Context

My Rx chain both gets a result and generates java.util.concurrent.TimeoutException. I believe the time out error is incorrect. The key step is converting the Flowable chain to a Single via firstOrError(). This results in a cancel() in the first onNext() call in the timeout operator subscriber. The onNext code then schedules a TimedTask even though things (especially including the upstream operator) are canceled. This guarantees we eventually get a time out exception.

Do you have a reproducer?

I have a simple Java code snippet that reproduces the issue for me:

    public void testTimeoutBug() {
        Vertx vertx = Vertx.vertx();

        // set ContextScheduler as the computation scheduler handler
        RxJavaPlugins.setComputationSchedulerHandler(cur -> new ContextScheduler(vertx, false));

        // Flowable w/timeout operator -> Single 
        List<Integer> oneInt = Collections.singletonList(1);
        Disposable d = Flowable.fromIterable(oneInt).timeout(3, TimeUnit.SECONDS).doOnError(t -> {
            System.out.println("Got an error: " + t.getMessage());
        }).firstOrError().subscribe(value -> {
            System.out.println("Got result: " + value.toString());
        });

        try {
            // wait 4 seconds for the timeout to happen
            Thread.sleep(TimeUnit.SECONDS.toMillis(4));
        } catch (InterruptedException ignored) {
        }
    }

Steps to reproduce

  1. Run the above test

result for me:

Got result: 1
Got an error: The source did not signal an event for 3 seconds and has been terminated.
io.reactivex.rxjava3.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.util.concurrent.TimeoutException: The source did not signal an event for 3 seconds and has been terminated.

The sequence of interest is in the onNext() method of the FlowableTimeoutTimed subscriber (comments are mine):

            task.get().dispose();  // disposes the initial TimedAction

            downstream.onNext(t);  // results in cancel() method invocation courtesy of firstOrError canceling upstream

            startTimeout(idx + 1); // schedules another TimedAction w/3 second time out even though upstream is now canceled

Note that if I don't set the scheduler to ContextScheduler, the issue does not reproduce. The reason is that the default Scheduler (ComputationScheduler) has a Worker that first checks if it is disposed before scheduling anything new. The ContextScheduler's Worker does not perform this check:

        public Disposable schedule(@NonNull Runnable action) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }

            return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial);
        }

Extra

tsegismont commented 1 year ago

Thank you for the detailed report @mer2github

tsegismont commented 1 year ago

Fixed in a596c54b2a416c6072d1ae2eec7e52e036e429c4