agronholm / apscheduler

Task scheduling library for Python
MIT License
6.25k stars 707 forks source link

Bug when stopping scheduler with jobs running - APScheduler v4 #946

Open HK-Mattew opened 3 months ago

HK-Mattew commented 3 months ago

Things to check first

Version

4.0.0a5

What happened?

Hello,

I would like to report a bug that occurs after stopping the Scheduler. From what I have noticed, the bug occurs when I stop the scheduler while there is a job running.

Summary: While a job is still running, I use the scheduler's .stop method. I wait until the scheduler.state is in the stopped state. I see that when it reaches the scheduler.state.stopped state, the job that was running before I stopped the scheduler is completed successfully. However, some operations remain pending in the DataStore, such as decreasing the running_jobs of the task document, and the job is not deleted from the job collection.

Tested only with MongoDBDataStore

How can we reproduce the bug?

Code to replicate the bug:

from apscheduler import (
    Scheduler,
    SchedulerRole,
    TaskDefaults,
    RunState
)
from apscheduler.datastores.mongodb import (
    MongoDBDataStore
)
from apscheduler.triggers.interval import IntervalTrigger

from apscheduler.executors.thread import ThreadPoolJobExecutor
from datetime import datetime
import time

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.INFO)

MONGO_DB_URI = '<your-mongodb-uri>'
DB_NAME_FOR_APS = 'tryreplybugaps'

scheduler_web_configs = dict(
    data_store=MongoDBDataStore(
        client_or_uri=MONGO_DB_URI,
        database=DB_NAME_FOR_APS
    ),
    role=SchedulerRole.both,
    max_concurrent_jobs=100,
    job_executors={
        'threadpool': ThreadPoolJobExecutor()
    },
    task_defaults=TaskDefaults(
        job_executor='threadpool',
        max_running_jobs=1
    )
)

with Scheduler(
    **scheduler_web_configs
    ) as scheduler:

    def my_job_func() -> None:

        print('In a lengthy process...')
        time.sleep(10)
        print('Long process completed.')

        return None

    scheduler.add_schedule(
        my_job_func, # This will run in `threadpool` as specified in TaskDefaults.
        trigger=IntervalTrigger(
            hours=1,
            start_time=datetime.now()
        )
    )

    scheduler.start_in_background()

    while len(scheduler._async_scheduler._running_jobs) == 0:
        print('Waiting for some job to start running...')
        time.sleep(5)

    print('Stopping the scheduler when there are still jobs running...', scheduler._async_scheduler._running_jobs)
    scheduler.stop()

    while scheduler.state != RunState.stopped:
        print('Waiting for the scheduler to be in the stopped state...')
        time.sleep(2)

    assert len(scheduler._async_scheduler._running_jobs) == 0

    datastore_tasks = scheduler.get_tasks() # The task continues with the field running_jobs=1
    datastore_jobs = scheduler.get_jobs() # The job was not deleted after execution.

    print('Tasks:', datastore_tasks)
    print('Jobs:', datastore_jobs)

    """
    In my case, the execution went well. The job was completed.

    However, in the task document in the database, the task continues with the field running_jobs=1.,
    But the job is no longer being executed, and this will prevent a new start of the schedule tasks in the next program starts,
    since I use max_running_jobs=1.

    And in the jobs collection, the job document remains in the database. Which I believe should have been deleted, right?
    """

My logs running the sample code:

INFO:apscheduler._schedulers.async_:Added new schedule (task='__main__:my_job_func', trigger=IntervalTrigger(hours=1, start_time='2024-07-28 18:58:32.458838-03:00')); next run time at 2024-07-28 18:58:32.458838-03:00
INFO:apscheduler._schedulers.async_:Scheduler started
Waiting for some job to start running...
In a lengthy process...
Stopping the scheduler when there are still jobs running... {Job(id=UUID('1503845e-75bf-4c90-a583-b0bc138a94ce'), task_id='__main__:my_job_func', schedule_id='a3325d5d-1ff6-4ff4-8ca8-e8ad934ac782', scheduled_fire_time=datetime.datetime(2024, 7, 28, 18, 58, 32, 458838, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=75600))), executor='threadpool', created_at=datetime.datetime(2024, 7, 28, 21, 58, 33, 230771, tzinfo=datetime.timezone.utc))}
Waiting for the scheduler to be in the stopped state...
Waiting for the scheduler to be in the stopped state...
Waiting for the scheduler to be in the stopped state...
Waiting for the scheduler to be in the stopped state...
Long process completed.
INFO:apscheduler._schedulers.async_:Job 1503845e-75bf-4c90-a583-b0bc138a94ce completed successfully
INFO:apscheduler._schedulers.async_:Scheduler stopped
Tasks: [
    Task(
        id='__main__:my_job_func',
        func='__main__:my_job_func',
        job_executor='threadpool',
        max_running_jobs=1,
        misfire_grace_time=None,
        running_jobs=1
        )
    ]
Jobs: [
    Job(
        id=UUID('1503845e-75bf-4c90-a583-b0bc138a94ce'),
        task_id='__main__:my_job_func',
        schedule_id='a3325d5d-1ff6-4ff4-8ca8-e8ad934ac782',
        scheduled_fire_time=datetime.datetime(2024, 7, 28, 18, 58, 32, 458838,
        tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=75600))),
        executor='threadpool',
        created_at=datetime.datetime(2024, 7, 28, 21, 58, 33, 230771, tzinfo=datetime.timezone.utc)
    )
    ]
agronholm commented 3 months ago

I see two problems in the code:

  1. The stop() method is a bit too forceful (by default), and should probably not cancel the scheduler's task group
  2. The job acquisition operation isn't exactly atomic on MongoDB (I don't know how to actually accomplish that given its nature), but a cancellation will abort the operation halfway through.
HK-Mattew commented 3 months ago
  1. The stop() method is a bit too forceful (by default), and should probably not cancel the scheduler's task group

However, the expected behavior when the job is completed normally is:

However, in my case, I used Scheduler.stop before the job was completed. Then the Scheduler showed me the log Job ...id... completed successfully, so the job was completed. However, the job was not deleted from the datastore and the task did not have its running_jobs decrement.

My question is, what is causing the scheduler not to delete the job at the end of its execution and also why did it not decrement the task's running_jobs field?

2. The job acquisition operation isn't exactly atomic on MongoDB (I don't know how to actually accomplish that given its nature), but a cancellation will abort the operation halfway through.

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

If I'm leaving anything unnoticed please let me know.

agronholm commented 3 months ago

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

Transactions don't work on a single node Mongo server.

agronholm commented 3 months ago

My question is, what is causing the scheduler not to delete the job at the end of its execution and also why did it not decrement the task's running_jobs field?

Because stop() currently cancels all the task groups within the scheduler, and there is no shielding to prevent the job release operation from being cancelled.

HK-Mattew commented 3 months ago

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

Transactions don't work on a single node Mongo server.

You are right. So that would be the difficulty because not everyone uses MongoDB with more than one node :/

I use MongoDB with more than one node. So I think I would have to try to adapt something like MongoDBDataStore(allow_transaction=True)

HK-Mattew commented 3 months ago

My question is, what is causing the scheduler not to delete the job at the end of its execution and also why did it not decrement the task's running_jobs field?

Because stop() currently cancels all the task groups within the scheduler, and there is no shielding to prevent the job release operation from being cancelled.

I understood

agronholm commented 3 months ago

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

Transactions don't work on a single node Mongo server.

You are right. So that would be the difficulty because not everyone uses MongoDB with more than one node :/

I use MongoDB with more than one node. So I think I would have to try to adapt something like MongoDBDataStore(allow_transaction=True)

How would that help users with just one node?

HK-Mattew commented 3 months ago

allow_transaction

Unfortunately this would still be a problem. Since transactions would only work for those who used allow_transaction=True

HK-Mattew commented 3 months ago

I found a temporary solution to the problem I'm facing. Just save the jobs that were running when the scheduler stopped and use the .release_job method manually, like this:

asyncio.run(
       scheduler.data_store.release_job(
           ...
           )
)

This worked correctly even after the scheduler stopped.

agronholm commented 3 months ago

I found a temporary solution to the problem I'm facing. Just save the jobs that were running when the scheduler stopped and use the .release_job method manually, like this:

asyncio.run(
       scheduler.data_store.release_job(
           ...
           )
)

This worked correctly even after the scheduler stopped.

This is a dangerous looking "fix". You should be aware that I'm currently in the process of refactoring the stop() method to allow the scheduler to shut down more gracefully, allowing jobs to complete properly if they do so within the allotted time. I'm also considering shielding the release operations from CancelScope cancellation if that looks like it makes sense.

HK-Mattew commented 3 months ago

This is a dangerous looking "fix". You should be aware that I'm currently in the process of refactoring the stop() method to allow the scheduler to shut down more gracefully, allowing jobs to complete properly if they do so within the allotted time. I'm also considering shielding the release operations from CancelScope cancellation if that looks like it makes sense.

It's not really the best solution. But it would help temporarily.

Your idea about CancelScope sounds good. I hope it works well :)

[Another idea] One idea I had would be to work with signals in the Scheduler.

Example:

class Scheduler():
      ...

scheduler = Scheduler()

scheduler.send_signal('stop running new jobs')

"""
I wait until no jobs are running in the scheduler and then use the scheduler.stop() method.
With this, the scheduler would be able to process the job deletion operations after they are executed,
and also decrement the task running_jobs field.
"""

assert len(scheduler._async_scheduler._running_jobs) == 0

scheduler.stop()

This seems like a good solution to the current problem.

agronholm commented 3 months ago

The stop() method already sets the scheduler state to stopping which signals to the background tasks that they should exit their respective loops. Unfortunately, currently there are background tasks which sleep for certain periods of time, and I have to find a way to safely interrupt these tasks in order to allow their task groups to exit.

HK-Mattew commented 3 months ago

The stop() method already sets the scheduler state to stopping which signals to the background tasks that they should exit their respective loops. Unfortunately, currently there are background tasks which sleep for certain periods of time, and I have to find a way to safely interrupt these tasks in order to allow their task groups to exit.

I just checked and, indeed, the AsyncScheduler._process_jobs method has a condition to only run with RunState.started.

So, I think the best bet would be your idea about CancelScope.

Man, I'd like to take this opportunity to thank you for your great work. This new version of APScheduler is looking amazing. I really like it. 😉

agronholm commented 3 months ago

So, I think the best bet would be your idea about CancelScope.

I'm not sure we're on the same page here. I brought up CancelScope because those could be used to shield certain sensitive operations (like releasing a job) from cancellation.

Man, I'd like to take this opportunity to thank you for your great work. This new version of APScheduler is looking amazing. I really like it. 😉

Thanks! Always nice to see one's work appreciated!