farahats9 / sqlalchemy-celery-beat

Celery Periodic Tasks backed by the SQLAlchemy
MIT License
43 stars 6 forks source link

Question: How to relate celery_periodic_task to celery_task_meta #5

Closed miguelvalente closed 6 months ago

miguelvalente commented 6 months ago

Essentially what I'm looking for is how to associate a Task with the results from its execution. To be able to show instances of failure and the current status of a Task.

I think this can be achieved by: Option 1: linking the celery_periodictask.idto the celery_taskmeta rows. Option 2: give the same in celery_periodictask.name to the celery_taskmeta.name.

I'm just not sure about how to execute this the above steps. Celery's documentation does not provide decent insights.

Once again I'm probably missing something glaring. Sorry in advance, getting to around Celery for the first time :).

farahats9 commented 6 months ago

If I understand you correctly, what you want can be achieved through the results backend. set the results backend to a database that you want to save in, you will have the table for the results.

you can access the results later using celery:

from celery.result import AsyncResult
t = AsyncResult(task_id, app=celery_app)
metadata = t._get_task_meta()
# metadata:
# {'status': 'SUCCESS', 'result': 'the function return value', 'traceback': None, 'children': [], 'date_done': '2022-11-24T13:10:56.584945', 'task_id': '92867bf7-9873-4ff6-8f4a-936ecbeb6229'}

But I don't think this is related to sqlalchemy-celery-beat and is more related to celery's results backend itself. Or are you talking about PeriodicTask not Task ?

miguelvalente commented 6 months ago

But I don't think this is related to sqlalchemy-celery-beat and is more related to celery's results backend itself. Or are you talking about PeriodicTask not Task ?

Sorry I think I use the wrong terminology I'm talking about the schedules stored by sqlalchemy-celery-beat

Thanks for reaching out so fast. This is the task I'm scheduling:

task = PeriodicTask(
    schedule_model=schedule,
    name='Adding to 69',
    task='main.add',
    args='[60, 9]',
)

I see that it gets saved to celery_periodictask: Screenshot 2024-03-01 at 14 26 19

I would like to associate the a row of celery_periodictask to its corresponding rows in celery_taskmeta . I'm just unsure on how to add that information. Currently I'm looking a celery.signals to see if I can inject the id of the celery_periodictask into the celery_taskmeta somehow.

farahats9 commented 6 months ago

ok I understand what you want now, you can make a table that relates a task_id with the PeriodicTask.id. This will only be suitable if you are going to do joins. If you don't need joins then I would recommend your second solution which is setting the task name to be the same as the periodic task name. then in

t = AsyncResult(task_id, app=celery_app)
print(t.name)
# will show your task name

you can use that to query the PeriodicTask and do what you want.

miguelvalente commented 6 months ago

Thanks for the help in the end I'll use celery's before_task_publish signal which allows me to correlate the unique name and task id

@before_task_publish.connect
def before_task_publish_handler(sender=None, body=None, exchange=None, routing_key=None, **kwargs):
    kwargs.get("properties", {}).get('periodic_task_name')  # unique_name in celery_periodictask
    kwargs.get("properties", {}).get("correlation_id")  # task_id stored in celery_taskmeta