farahats9 / sqlalchemy-celery-beat

Celery Periodic Tasks backed by the SQLAlchemy
MIT License
43 stars 6 forks source link

Celery Beat Worker creates when deleting a shedule #8

Closed miguelvalente closed 2 months ago

miguelvalente commented 6 months ago

How I'm deleting a schedule:

async def delete_scheduled_task(task_name: str):
    session_manager = SessionManager()
    session = session_manager.session_factory(db_uri)

    with session_cleanup(session):
        stmt = delete(PeriodicTask).where(PeriodicTask.name == task_name)

        session.execute(stmt)
        session.commit()

        PeriodicTaskChanged.update_from_session(session)

The error:


celery_beat    | [2024-03-09 10:20:31,162: INFO/MainProcess] DatabaseScheduler: Schedule changed.
celery_beat    | [2024-03-09 10:20:31,184: CRITICAL/MainProcess] beat raised exception <class 'AttributeError'>: AttributeError("'NoneType' object has no attribute 'last_run_at'")
celery_beat    | Traceback (most recent call last):
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/celery/apps/beat.py", line 113, in start_scheduler
celery_beat    |     service.start()
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/celery/beat.py", line 643, in start
celery_beat    |     interval = self.scheduler.tick()
celery_beat    |                ^^^^^^^^^^^^^^^^^^^^^
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/celery/beat.py", line 337, in tick
celery_beat    |     not self.schedules_equal(self.old_schedulers, self.schedule)):
celery_beat    |                                                   ^^^^^^^^^^^^^
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/sqlalchemy_celery_beat/schedulers.py", line 429, in schedule
celery_beat    |     self.sync()
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/sqlalchemy_celery_beat/schedulers.py", line 362, in sync
celery_beat    |     self.schedule[name].save()  # save to database
celery_beat    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/sqlalchemy_celery_beat/schedulers.py", line 179, in save
celery_beat    |     setattr(obj, field, getattr(self.model, field))
celery_beat    | AttributeError: 'NoneType' object has no attribute 'last_run_at'

For now I'm just disabling instead of deleting. I guess it's not a big issue and more of curiosity. Have you ran into this issue? Do you know what's going on with it?

farahats9 commented 6 months ago

I couldn't reproduce the issue with your code, also noticed you are using a coroutine while there are no awaits inside, are you sure about that async function or is this a snippet from larger code?

setu4993 commented 4 months ago

I was running into the same issue. Turns out, I was attempting to delete a record that didn't exist anyway, i.e. the result of:

stmt = select(PeriodicTask).where(PeriodicTask.name == task_name)

was None, and thus the delete was failing.

miguelvalente commented 4 months ago

Forgot about this. I'll come come by later and update. Thanks for the heads up tho :) @setu4993

setu4993 commented 4 months ago

Actually, never mind ^, I am running into this issue still when deleting any tasks. It appears that deleting a task instead of disabling it causes this.

My function to delete a task is:

def delete_task(task_name: str) -> None:
    with session_cleanup(session):
        existing_task = session.query(PeriodicTask).filter_by(name=task_name).first()
        if existing_task:
            statement = delete(PeriodicTask).where(PeriodicTask.name == task_name)
            session.execute(statement)
            session.commit()
            PeriodicTaskChanged.update_from_session(session)

And as PeriodicTaskChanged.update_from_session executes, it results in the error from OP. So, the error does occur if a PeriodicTask is deleted.

It does seem like something (this line?) in the PeriodicTaskChanged.update_from_session(session) process tries to pull down a task by the name and can't handle the case when it doesn't exist.

If there was a check here to validate that the schedule actually exists before saving it, I think it'd get around this error.


This issue maybe gets to a larger question: What's the best way to delete a task? The example in the readme suggests disabling it, but the deletion flow isn't in there, and that's the flow that is causing this.

setu4993 commented 4 months ago

Some more context on why I'm even doing a delete: I need to update the schedule for an existing task but that's non-trivial (atleast with my experience of SQL Alchemy) without also setting all of the discriminator dependent values for the schedule myself.

setu4993 commented 3 months ago

@farahats9 : Would love docs on the recommended ways to update and delete schedules. I think that'll help resolve this completely.

farahats9 commented 3 months ago
def delete_task(task_name: str) -> None:
    with session_cleanup(session):
        existing_task = session.query(PeriodicTask).filter_by(name=task_name).first()
        if existing_task:
            statement = delete(PeriodicTask).where(PeriodicTask.name == task_name)
            session.execute(statement)
            session.commit()
            PeriodicTaskChanged.update_from_session(session)

I tested this code and still didn't receive any error. So I made test cases for deletion for cases of deleting one or multiple tasks and again the cases don't fail. Will push them soon.

Could you please share the exact versions and broker and database types @setu4993 @miguelvalente

And regarding updating, yes you either need to update both discriminator AND schedule_id (if you are updating multiple tasks at the same time) or you can use task.schedule_model = your_schedule_model if you are just updating one task, obviously you need to do session.add and session.commit after that (you don't need to call PeriodicTaskChanged.update_from_session if you are updating just 1 task using this method, but you need to do that if you are calling the sqlalchemy.update() method

setu4993 commented 3 months ago

Thanks, @farahats9.

I'm using Postgres 16.1 as the DB with RabbitMQ 3.12.2 as the broker. I also tried using an in-memory DB and it fails in there, too. Not sure if that influences things, I'm running beat and worker within the same Celery process.

I did start using the update as you described in here a couple weeks ago and that's been fine. The biggest reason I had to use this was to delete tasks, but that didn't work.

farahats9 commented 3 months ago

@setu4993 Thank you, I was able to finally reproduce the bug when I used Postgres. Pushing a fix in the next 48 hours.

And be careful when using the same process for beat+worker, this is not recommended for production but fine for development environment. In this case it will not update the queue in time correctly so I would avoid it even in development.

farahats9 commented 3 months ago

@setu4993 can you please confirm if you have the time if the recent pull request fixed your issue? after that I will publish a release with all the recent fixes.

setu4993 commented 3 months ago

That's great to hear! Unfortunately, I haven't had a chance to test this out yet but I'll do it first thing on Monday and report back.

farahats9 commented 2 months ago

I am closing the issue since the new release is out and I didn't receive complaints. Please open another issue if you still have problems.

miguelvalente commented 2 months ago

stop by to say thanks. Haven't tested tho.

setu4993 commented 2 months ago

I just finally got around to testing this and v0.8.0 works really well.

Thank you so much for the quick fix and the updated docs @farahats9! Appreciate it a bunch.

(Took a bit on my end to find the time to refactor our workarounds and go back to the flow to delete cleanly.)