Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.27k stars 106 forks source link

Scheduler don´t run after_success(fuc) after some time. #187

Open everthought opened 1 year ago

everthought commented 1 year ago

When I start the scheduler, it runs stable for some time, but withouht any msg, the after_success(fuc) stop running. Only the periodic timer functions are executed.

Screenshots from the DB log image

The Scheduler implementation:


sec_db_engine = create_engine(db_settings.DB_URL, pool_pre_ping=True)

app_scheduler = Rocketry(
    execution="async",
    # logger_repo=MemoryRepo(),
    logger_repo=SQLRepo(engine=sec_db_engine, table="scheduler_log", model=MinimalRunRecord, id_field="id"),
    config = {
    "task_execution": "async",
    'silence_task_prerun': False,
    'silence_task_logging': False,
    'silence_cond_check': False,
    "force_status_from_logs": True
    }
    )

from .tasks import scheduler
app_scheduler.include_grouper(scheduler)

The Task implementation


scheduler= Grouper()

@scheduler.task(
        minutely,
        execution="main",
    )
def check_starting_condition():
    if check_first() and check_second():
        logging.info(f"Check starting condition passed!")
        return "OK"
    else:
        raise Exception("Im not allowed to run the Taks")

@scheduler.task(after_fail(check_starting_condition),
    execution="main")
def failed_check():
    logging.info("Der report_check failed!")

@scheduler.task(after_success(check_starting_condition),
    execution="async")
async def main_exc_function(
    ):
# an async function call with aiohttp
return some_stuff

I integrated it along with Fastapi

class Server(uvicorn.Server):
    """Customized uvicorn.Server

    Uvicorn server overrides signals and we need to include
    Rocketry to the signals."""
    def handle_exit(self, sig: int, frame) -> None:
        app_scheduler.session.shut_down()
        return super().handle_exit(sig, frame)

async def run():
    server = Server(config=uvicorn.Config(
        app,
        loop="asyncio",
        host=server_settings.SERVER_HOST,
        port=server_settings.SERVER_PORT,
        reload=server_settings.SERVER_RELOAD,
        workers=server_settings.SERVER_WORKER,
        timeout_keep_alive=server_settings.SERVER_TIME_OUT,
        access_log = False))
    api = asyncio.create_task(server.serve())
    sched = asyncio.create_task(app_scheduler.serve())
    await asyncio.wait([sched, api])

if __name__ == "__main__":
    asyncio.run(run())
everthought commented 1 year ago

Now it gets even stranger. The Tasks started again after 22 missed runs.

image

everthought commented 1 year ago

I have now tested the scheduler for several days. The problem only occurs when I use the SQL repo.

I tried to set the SQLalchemy engine to log the queries via echo=true. Unfortunately I get the error message here "AttributeError: 'scheduler_log' object has no attribute '_sa_instance_state'" see logs.txt logs.txt

After using the logs from the DB, I noticed that despite the setting of the id_field with the config:

repo=SQLRepo(engine=sec_db_engine, table="scheduler_log", model=MinimalRunRecord, id_field="id")

the logs are queryed from the DB using the "created" column. image

Did i miss something in the config?

The documentation mentions that it is better to use an ID column, this may be the issue of the missed runs with after_success(fuc).