farahats9 / sqlalchemy-celery-beat

Celery Periodic Tasks backed by the SQLAlchemy
MIT License
43 stars 6 forks source link

Cron task is getting called twice #3

Closed seanic1979 closed 6 months ago

seanic1979 commented 6 months ago

I have a function that gets called from a flask endpoint to insert/update a task and its schedule.

@app.task
def update_task_schedule(name):
    session = session_maker()

    schedule = CrontabSchedule.from_schedule(
        session,
        schedules.crontab(
            minute='22',
            hour='*',
            day_of_week='*',
           day_of_month='*',
           month_of_year='*',
        )
    )

    task_stmt = db.select(PeriodicTask).where(PeriodicTask.name == name)
    result = session.execute(task_stmt)
    task = result.first()

    if task is None:
        stmt = db.insert(PeriodicTask).values(
            name=name,
            task='tasks.celery.index',
            schedule_id=schedule.id,
            discriminator=schedule.discriminator)
    else:
        stmt = db.update(PeriodicTask).values(
        schedule_id=schedule.id,
        discriminator=schedule.discriminator).where(PeriodicTask.name == name)

    session.execute(stmt)
    session.commit()

    PeriodicTaskChanged.update_from_session(session)

After this function is called the task looks like this in the database:

'3','index','tasks.celery.index','[]','{}',NULL,NULL,NULL,'{}',NULL,NULL,NULL,'0',NULL,'1',NULL,'0','2024-02-15 15:21:27','','crontabschedule','11'

'11','22','*','*','*','*','UTC'

On the console I see the task being invoked twice:

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[11418f46-42d5-4db5-9605-83831699b097] received DEBUG: 2024/02/15 11:22:00 TaskPool: Apply <function fast_trace_task at 0x10ca44700> (args:('tasks.celery.index', '11418f46-42d5-4db5-9605-83831699b097', {'lang': 'py', 'task': 'tasks.celery.index', 'id': '11418f46-42d5-4db5-9605-83831699b097', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '11418f46-42d5-4db5-9605-83831699b097', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen20489@greg-15-mbp.lan', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '11418f46-42d5-4db5-9605-83831699b097', 'reply_to': '6ddb9a64-3fe0-310e-948b-f79802c585a0', 'periodic_task_name': 'index', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '3654ceb6-cb2b-430a-988d-d33d24e32e61'}, 'reply_to': '6ddb9a64-3fe0-310e-948b-f79802c585a0', 'correlation_id': '11418f46-42d5-4db5-9605-83831699b097', 'hostname': 'celery@greg-15-mbp.lan', 'delivery_info':... kwargs:{})

WARNING: 2024/02/15 11:22:00 INDEXING

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[11418f46-42d5-4db5-9605-83831699b097] succeeded in 0.0007132080000360475s: None INFO: 2024/02/15 11:22:00 Task tasks.celery.index[88c8b00f-920f-4763-9ca9-571f6746cb59] received DEBUG: 2024/02/15 11:22:00 TaskPool: Apply <function fast_trace_task at 0x10ca44700> (args:('tasks.celery.index', '88c8b00f-920f-4763-9ca9-571f6746cb59', {'lang': 'py', 'task': 'tasks.celery.index', 'id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen19113@greg-15-mbp.lan', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'reply_to': '5ccfc9a3-ed52-3606-b511-260e477914f9', 'periodic_task_name': 'index', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '39dacb2b-c6a0-4e31-8179-256eebb46372'}, 'reply_to': '5ccfc9a3-ed52-3606-b511-260e477914f9', 'correlation_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'hostname': 'celery@greg-15-mbp.lan', 'delivery_info':... kwargs:{})

WARNING: 2024/02/15 11:22:00 INDEXING

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[88c8b00f-920f-4763-9ca9-571f6746cb59] succeeded in 0.0005478840000137097s: None

farahats9 commented 6 months ago

I tested your code and it executed only once. The only change I had was using the session from sqlalchemy-celery-beat like this:

from sqlalchemy_celery_beat.session import SessionManager

session_manager = SessionManager()

@app.task
def update_task_schedule(name):
    session = session_manager.session_factory(db_connection_uri)
    ...

Please add more information like your python version sqlalchemy version platform, celery config and version etc..

seanic1979 commented 6 months ago

Alright down to a single call now. I was starting celery like this:

celery --app=tasks.celery worker --beat --concurrency=1 --loglevel=DEBUG -P solo -S 
sqlalchemy_celery_beat.schedulers:DatabaseScheduler

I removed "--beat" and it works properly. However, there is something that I don't understand. I'm not running a separate celery beat instance. So was beat running twice when I had the "--beat" argument? Once because of "--beat" and once because the the "sqlalchemy_celery_beat.schedulers:DatabaseScheduler" part?

farahats9 commented 6 months ago

Tested again with the same command, no duplicate tasks. If this is happening as soon as you start the worker then it could be some lingering task from before you shut it down. Please check your queue for any remaining tasks.

As for your question, the --beat argument combines beat with the worker, it is generally not recommended in production and only for testing. The scheduler part has nothing to do with how many celery instances running, it just points to the scheduler that has the tasks.

seanic1979 commented 6 months ago

So how are things working for me now then. I only run:

celery --app=tasks.celery worker --concurrency=1 --loglevel=DEBUG -P solo -S 
sqlalchemy_celery_beat.schedulers:DatabaseScheduler

So no separate process is running celery beat, yet things are working perfect. All tasks are getting called as per the schedule. Shouldn't that only run a worker that is waiting for something to do? I admit my celery knowledge is not great :)

farahats9 commented 6 months ago

That doesn't happen and shouldn't happen, the -S argument doesn't have any effect if there's no beat running. As I said this could be lingering task in the queue or you may have beat running somewhere.

seanic1979 commented 6 months ago

Disregard my last question. Looks like I probably had a runaway celery beat process running on my laptop. I rebooted and things did not work until I put "--beat" back. This was probably why I was getting more than one call to the tasks in the first place.

farahats9 commented 6 months ago

Great! will close this issue then. Please keep posting feedback if you run into any issues.