agronholm / apscheduler

Task scheduling library for Python
MIT License
5.94k stars 690 forks source link

Worker stops if there are "too much" simultaneous jobs #881

Closed mavahedinia closed 1 month ago

mavahedinia commented 3 months ago

Things to check first

Version

4.0.0a4

What happened?

For my use case, I needed to have separate scheduler and worker instances (since there might be thousands of jobs to be executed in a short time and I need the system to be scalable). Everything was fine and stable until our team decided to take a stress test to see when the system breaks. Here we understood that if we throttle the number of jobs submitted to the scheduler, everything would be fine. However, if we submit like 1000 jobs at the same time to the scheduler, workers are overwhelmed and a deadlock situation is caused in the workers job processing loop; causing the worker to stop acquiring and executing jobs at all without any crashing.

I inspected the code and pinpointed the issue. It is happening inside the _process_jobs function under the scheduler (async) class (line 905 as I am reporting the issue - commit hash is f375b67b783bed67726a5336cda9680f5e946917). On line 938 inside the loop the worker awaits wakeup event, which itself is controlled by the job_added function defined there. this function is called only when a new job is added:

async def job_added(event: Event) -> None:
    if len(self._running_jobs) < self.max_concurrent_jobs:
        wakeup_event.set()
...
self.event_broker.subscribe(job_added, {JobAdded})

This combined with the max_concurrent_jobs constraint controlled on line 927 implies that if there are more than max_concurrent_jobs jobs in the db, the worker acquires and tries to execute them, i.e., appends them to the queue; but if there are no newer jobs scheduled after them, the wakeup_event is not set, resulting in a deadlock situation prohibiting the loop from acquiring more jobs, even if the queue is empty.

To fix that, I propose to change the structure only a little bit:

  1. changing the job_added name to check_queue_capacity
  2. Subscribe that function to both JobAdded and JobReleased events.

This way, we can ensure that if there are more jobs that the worker can handle at once, the worker gets notified after the queue is freed up.

How can we reproduce the bug?

Simply schedule around 1000 jobs, run couple of workers (less than 4) and see that some of the jobs won't be executed after a while, unless you restart the workers manually.

mavahedinia commented 3 months ago

The proposed solution is available at PR#882

agronholm commented 1 month ago

Fixed via 0596db776ffd604a4e819ee7dd6ea6c6e68bb516.