reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.9k stars 1.19k forks source link

PendingTasks leak in TimedScheduler #3642

Closed AChekroun closed 8 months ago

AChekroun commented 9 months ago

In TimedScheduler, micrometer's counter pendingTasks is never stop if the underlying scheduler reject the task.

Expected Behavior

Micrometer's counter pendingTasks must be stopped if the underlying scheduler throws a RejectedExecutionException

Actual Behavior

Micrometer's counter pendingTasks is not be stopped when the underlying scheduler throws a RejectedExecutionException, resulting in never ending LongRunningTask

Steps to Reproduce

    @Test
    void test() {
        CountDownLatch cdl = new CountDownLatch(1);
        SimpleMeterRegistry smr = new SimpleMeterRegistry();
        ExecutorService executorService =
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>()
            );

        Scheduler originalScheduler = Schedulers.fromExecutorService(executorService); // 1 thread - SynchronousQueue
        Scheduler timedScheduler = Micrometer.timedScheduler(originalScheduler, smr, "timed_scheduler");
        var meterIdStr = "timed_scheduler.scheduler.tasks.pending";
        RequiredSearch requiredSearch = smr.get(meterIdStr);
        LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer();

        Runnable supp = () -> {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };

        // 1 - Runnable is successfully submitted, but will be blocked by CDL
        Assertions.assertDoesNotThrow(() -> timedScheduler.schedule(supp));

        // 2nd is rejected because scheduler has only 1 thread and is currently busy with exec 1
        Assertions.assertThrows(RejectedExecutionException.class, () -> timedScheduler.schedule(supp));
        cdl.countDown(); // release 1st

        assertEquals(0, longTaskTimer.activeTasks(), "Expected no active task since 2nd submission has been rejected");
    }

Possible Solution

Catch any RejectedExecutionException to stop any pending tasks

Your Environment

Nicolas125841 commented 8 months ago

Hi @OlegDokuka, could I give this one a try?

I'm thinking of placing try-catch checks around the delegate.schedule calls in TimedScheduler to remove the pending status on rejection. This doesn't seem to affect the periodic version because it doesn't create a pending sample. Does that seem alright?

Thanks.

OlegDokuka commented 8 months ago

Hi @OlegDokuka, could I give this one a try?

I'm thinking of placing try-catch checks around the delegate.schedule calls in TimedScheduler to remove the pending status on rejection. This doesn't seem to affect the periodic version because it doesn't create a pending sample. Does that seem alright?

Thanks.

please do!

BarexaS commented 8 months ago

Can changes by applied to all kind of exception? My case - I have schedulers metrics in DataDog, and starting from some point any type of scheduler can start produce incomplete scheduled tasks metric from 0 task to millions of them. I guess problem not only in RejectedExecutionException since I use only project reactor and subscribeOn/publishOn methods.

chemicL commented 8 months ago

@BarexaS, the fix for this particular bug involves handling exceptions that we anticipate can happen in the API that is controlled by us, so we can stay focused on incremental improvements and get this fix incorporated in the current form to adhere to the original report. Please do open a new issue with a reproducible example which demonstrates the leak you observe, preferably with minimum dependencies.