agronholm / apscheduler

Task scheduling library for Python
MIT License
5.98k stars 694 forks source link

Simultaneously multiple jobs executions #835

Open vadympop opened 6 months ago

vadympop commented 6 months ago

Things to check first

Version

master

What happened?

JOB_1 is started multiple times simultaneously

Logs:

INFO:     Started server process [4416]
INFO:     Waiting for application startup.
LIFESPAN EVENT
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
2024-01-02 16:23:18.881490 JOB EXECUTE START
2024-01-02 16:23:18.884502 JOB EXECUTE START
2024-01-02 16:23:18.886518 JOB EXECUTE START
2024-01-02 16:32:20.371804 JOB EXECUTE START
2024-01-02 16:32:20.373802 JOB EXECUTE START
2024-01-02 16:32:20.375804 JOB EXECUTE START
2024-01-02 16:32:20.376797 JOB EXECUTE START
2024-01-02 16:38:18.878416 JOB EXECUTE END
2024-01-02 16:38:18.879395 JOB EXECUTE END
2024-01-02 16:38:18.880385 JOB EXECUTE END
2024-01-02 16:42:20.370167 JOB EXECUTE START
2024-01-02 16:42:20.372169 JOB EXECUTE START
2024-01-02 16:47:20.389294 JOB EXECUTE END
2024-01-02 16:47:20.390294 JOB EXECUTE END
2024-01-02 16:47:20.390294 JOB EXECUTE END
2024-01-02 16:47:20.391297 JOB EXECUTE END

In the logs we can see that 3 same jobs started at "2024-01-02 16:23:18" and started three times again at "2024-01-02 16:32:20" So how we can restrict starting new jobs if others haven't ended? And how we can restrict starting more than one the same job at one time for one server?

How can we reproduce the bug?

Code:

async def job_1():
    print(f"{datetime.datetime.now()} JOB EXECUTE START")
    await asyncio.sleep(300)
    print(f"{datetime.datetime.now()} JOB EXECUTE END")

class SchedulerMiddleware:
    def __init__(self, app: ASGIApp, scheduler: AsyncScheduler) -> None:
        self.app = app
        self.scheduler = scheduler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "lifespan":
            async with self.scheduler:
                await self.scheduler.add_schedule(job_1, IntervalTrigger(minutes=2), id="job_1")
                await self.scheduler.start_in_background()
                await self.app(scope, receive, send)
        else:
            await self.app(scope, receive, send)

@asynccontextmanager
async def lifespan(_: FastAPI):
    print("LIFESPAN EVENT")
    yield

engine = create_async_engine("postgresql+asyncpg://...")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker, max_concurrent_jobs=1)
app = FastAPI(lifespan=lifespan, middleware=[Middleware(SchedulerMiddleware, scheduler=scheduler)])

jobs db table rows at the startup:

"4e961180-5a95-44b6-9d18-65c7ce3e9e17"  "apscheduler_test:job_1"    "binary data"   "binary data"   "job_1" "2024-01-01 21:44:12.480608+00" "00:00:00"      "00:00:00"  "2024-01-01 21:44:12.487833+00"     "DESKTOP-TN7O7MC-4416-2556781170176"    "2024-01-02 14:32:50.359799+00"
"5cc5aa72-ccac-42b5-ac5c-d72b266535fa"  "apscheduler_test:job_1"    "binary data"   "binary data"   "job_1" "2024-01-02 14:12:20.322882+00" "00:00:00"      "00:00:00"  "2024-01-02 14:12:20.450873+00"     "DESKTOP-TN7O7MC-4416-2556781170176"    "2024-01-02 14:32:50.359799+00"
"a4056968-fa8b-4d4f-95f9-a1c49183ee5d"  "apscheduler_test:job_1"    "binary data"   "binary data"   "job_1" "2024-01-02 14:22:20.322882+00" "00:00:00"      "00:00:00"  "2024-01-02 14:22:20.331503+00"     "DESKTOP-TN7O7MC-4416-2556781170176"    "2024-01-02 14:32:50.359799+00"

I dont know why in jobs table three of same jobs, maybe because I shutdown the server before jobs executions ended