agronholm / apscheduler

Task scheduling library for Python
MIT License
5.98k stars 694 forks source link

Use Apscheduler in fastapi lifespan not work! #844

Closed pangxiaobin closed 6 months ago

pangxiaobin commented 6 months ago

Things to check first

Version

APScheduler==4.0.0a4

What happened?

Use Apscheduler in fastapi lifespan does not work!

How can we reproduce the bug?

Use docker start postgresql

docker run --rm -d  --name postgresql -e POSTGRESQL_PASSWORD=docker -e POSTGRESQL_DATABASE=data_base -p 5433:5432 bitnami/postgresql:12.9.0

dependency

fastapi==0.108.0
asyncpg==0.29.0
uvicorn==0.25.0
SQLAlchemy==2.0.25
APScheduler==4.0.0a4

example

# demo.py
from __future__ import annotations

import sys
import threading
from contextlib import asynccontextmanager
import datetime

from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine

from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

# docker run --rm -d  --name postgresql -e POSTGRESQL_PASSWORD=docker -e POSTGRESQL_DATABASE=data_base -p 5433:5432 bitnami/postgresql:12.9.0
engine = create_async_engine("postgresql+asyncpg://postgres:docker@127.0.0.1:5433/data_base")

data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
async_scheduler = AsyncScheduler(data_store, event_broker)

def tick():
    print(f"threading {threading.get_ident()} Hello, the time is", datetime.datetime.now())

async def add_task():
    async with async_scheduler as scheduler:
        await scheduler.add_schedule(
            tick, IntervalTrigger(seconds=1), id="tick"
        )

async def async_scheduler_start():
    await add_task()
    try:
        async with async_scheduler as scheduler:
            await scheduler.start_in_background()
    except Exception as e:
        print(e)
        sys.exit()
    print('async_scheduler_start')

async def async_scheduler_shutdown(wait: bool = False):
    try:
        async with async_scheduler as scheduler:
            if not wait:
                await scheduler.stop()
            else:
                await scheduler.wait_until_stopped()
    except Exception as e:
        print('stop')
    print('stop')

@asynccontextmanager
async def lifespan(app: FastAPI):
    await async_scheduler_start()
    yield
    await async_scheduler_shutdown()

app = FastAPI(lifespan=lifespan)

log

server uvicorn demo:app                                                                                                                           
INFO:     Started server process [4474]
INFO:     Waiting for application startup.
async_scheduler_start
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

I've enabled start_in_background, but I haven't seen the output of the trick print.

Why can't my implementation, similar to the fasgi_fastapi.py example you provided, run successfully? Do you have any suggestions?

agronholm commented 6 months ago

Because your implementation shuts down the scheduler as soon as async_scheduler_start() exits (when the context manager block is exited).

agronholm commented 6 months ago

What's the problem with my solution? Why did you have to make your own?

pangxiaobin commented 6 months ago

await async_scheduler_shutdown()

I am currently studying relevant content and attempting to implement alternative solutions. In the FastAPI documentation, it is mentioned that setting the lifespan will start running the code after the program is launched and will only execute the code after the 'yield' statement when the program is ending. I implemented it this way to manually shut down the scheduler worker when the program ends.

On the other hand, if multiple workers are specified using uvicorn, will there be multiple scheduler workers running simultaneously?

agronholm commented 6 months ago

I am currently studying relevant content and attempting to implement alternative solutions. In the FastAPI documentation, it is mentioned that setting the lifespan will start running the code after the program is launched and will only execute the code after the 'yield' statement when the program is ending. I implemented it this way to manually shut down the scheduler worker when the program ends.

I'm not sure what you're talking about. There is no yield statement anywhere in the asgi_fastapi.py example.

On the other hand, if multiple workers are specified using uvicorn, will there be multiple scheduler workers running simultaneously?

Yes, but that's not a problem on APScheduler 4, as it supports multiple concurrently running scheduler instances.

pangxiaobin commented 6 months ago

Thanks a lot. I'll use the asgi_fastapi demo you provided.

unights commented 4 months ago

@pangxiaobin i have similar issue and here is my solution.

# -*- coding: utf-8 -*-
import threading
from contextlib import asynccontextmanager
from datetime import datetime

from apscheduler import AsyncScheduler
from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.eventbrokers.local import LocalEventBroker
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI

scheduler: AsyncScheduler
data_store = MemoryDataStore()
event_broker = LocalEventBroker()

def tick():
    print(f"threading {threading.get_ident()} Hello, the time is", datetime.now())

async def add_task():
    try:
        await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
    except NameError as e:
        raise RuntimeError("Not init.") from e

@asynccontextmanager
async def lifespan(application: FastAPI):  # noqa: F841
    global scheduler
    async with AsyncScheduler(data_store, event_broker) as scheduler:
        await scheduler.start_in_background()
        await add_task()  # just for test
        yield
        await scheduler.stop()
        await scheduler.wait_until_stopped()

app = FastAPI(lifespan=lifespan)

or wrap it into an extra lifespan ( i prefer this )

# -*- coding: utf-8 -*-
import threading
from contextlib import AsyncExitStack, asynccontextmanager
from datetime import datetime
from typing import TYPE_CHECKING

from apscheduler import AsyncScheduler
from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.eventbrokers.local import LocalEventBroker
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI

if TYPE_CHECKING:
    from apscheduler.abc import DataStore, EventBroker

scheduler: AsyncScheduler

def tick():
    print(f"threading {threading.get_ident()} Hello, the time is", datetime.now())

async def add_task():
    try:
        await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
    except NameError as e:
        raise RuntimeError("Not init.") from e

@asynccontextmanager
async def scheduler_lifespan(data_store: 'DataStore', event_broker: 'EventBroker'):
    global scheduler
    async with AsyncScheduler(data_store, event_broker) as scheduler:
        await scheduler.start_in_background()
        await add_task()  # just for test
        yield
        await scheduler.stop()
        await scheduler.wait_until_stopped()

@asynccontextmanager
async def lifespan(application: FastAPI):  # noqa: F841

    async with AsyncExitStack() as stack:
        await stack.enter_async_context(
            scheduler_lifespan(
                data_store=MemoryDataStore(),
                event_broker=LocalEventBroker(),
            )
        )
        yield

app = FastAPI(lifespan=lifespan)

hope it helps you

peterschutt commented 4 months ago

I'm doing something very similar with Litestar's lifespan context also @unights. Is there a reason that you explicitly await scheduler.stop() and await scheduler.wait_until_stopped() inside the scheduler context, given that the context itself awaits stop() on exit? In other words, is there difference between your example and this?

@asynccontextmanager
async def scheduler_lifespan(data_store: 'DataStore', event_broker: 'EventBroker'):
    global scheduler
    async with AsyncScheduler(data_store, event_broker) as scheduler:
        await scheduler.start_in_background()
        await add_task()  # just for test
        yield
    await scheduler.wait_until_stopped()

FWICT, yours will await the scheduler reaching a stopped status before things registered on the schedulers internal exit stack have been exited and I'm curious if this is deliberate?

unights commented 4 months ago

I'm doing something very similar with Litestar's lifespan context also @unights. Is there a reason that you explicitly await scheduler.stop() and await scheduler.wait_until_stopped() inside the scheduler context, given that the context itself awaits stop() on exit? In other words, is there difference between your example and this?

@asynccontextmanager
async def scheduler_lifespan(data_store: 'DataStore', event_broker: 'EventBroker'):
    global scheduler
    async with AsyncScheduler(data_store, event_broker) as scheduler:
        await scheduler.start_in_background()
        await add_task()  # just for test
        yield
    await scheduler.wait_until_stopped()

FWICT, yours will await the scheduler reaching a stopped status before things registered on the schedulers internal exit stack have been exited and I'm curious if this is deliberate?

No difference, it's just my personal habits. :smile:

peterschutt commented 4 months ago

OK great, thanks for responding!

nkondakov commented 2 weeks ago

I'm doing something very similar with Litestar's lifespan context also @unights. Is there a reason that you explicitly await scheduler.stop() and await scheduler.wait_until_stopped() inside the scheduler context, given that the context itself awaits stop() on exit? In other words, is there difference between your example and this?

@asynccontextmanager
async def scheduler_lifespan(data_store: 'DataStore', event_broker: 'EventBroker'):
    global scheduler
    async with AsyncScheduler(data_store, event_broker) as scheduler:
        await scheduler.start_in_background()
        await add_task()  # just for test
        yield
    await scheduler.wait_until_stopped()

FWICT, yours will await the scheduler reaching a stopped status before things registered on the schedulers internal exit stack have been exited and I'm curious if this is deliberate?

Peter, I'm trying at this moment to implement apscheduler in Litestar app, could you, please share the way how you initialised APScheduler. When I'm starting it as run_until_stopped it runs well asp scheduler works well but endpoints aren't wornik, but as start_in_background it doesn't trigger any tasks but endpoints work well.