earlgreyness / aio-celery

Celery worker for running asyncio coroutine tasks
20 stars 2 forks source link

Exceptions raised in worker tasks not propagated to AsyncResult.get() in the app #3

Open rkrell opened 5 months ago

rkrell commented 5 months ago

Another issue yet:

There is no clean handling for exceptions happened in async tasks. The AsyncResult.get() method in the aio_celery app just remains hanging.

Raising exceptions on the app side would help to generate proper response in API endpoints by just catching them and converting to the proper HTTP status.

rkrell commented 5 months ago

Worker trace

Traceback (most recent call last):
  File "/home/rkrell/.pyenv/versions/project-pypy3.10-7.3.13/lib/pypy3.10/site-packages/aio_celery/worker.py", line 191, in on_message_received
    result = await coro
  File "/home/rkrell/work/project/backend/celery_app/tasks/metadata/tasks.py", line 24, in metadata_task
    raise Exception(desc)

this code in my aio_celery app:

        res: AsyncResult = await metadata_task.delay(barcode, base_path)
        return await res.get()

just hangs.

rkrell commented 5 months ago

I worked around this by copied code parts from the celery project serializing exceptions and test for them on the app side. I got to create an override of Celery.send_task() to include this, generating a dict from a thrown exception and returning it as result instead of just logging the exception. On the aio_celery app side, if an exception is recognized from the result, the dict is converted back to Python and raised in the app itself. To be honest, this still needs a dependency to the kombu library helping with serialization. Just some kind of over-complicated.

I'd really appreciate native exception support in aio_celery, to avoid AsyncResult.get() from hanging in case an exceptions is raised on the worker.