the-wondersmith / celery-aio-pool

Celery worker pool with support for asyncio coroutines as tasks
GNU Affero General Public License v3.0
54 stars 7 forks source link

Signals support #13

Open mister-vio opened 1 year ago

mister-vio commented 1 year ago

Hello, is there a way to make async Celery Signals? Do you have an Idea of a workaround? what I want to achieve is the async initialization of some resources after the worker pool gets created like this:

from celery import signals

@signals.worker_init.connect
async def worker_init_dispatch(*args, **kwargs):
    await print('Do some worker initialization')
mister-vio commented 1 year ago

Found this workaround, the solution is a bit stupid, but it works :) :

import asyncio
import os

from functools import wraps
import celery_aio_pool as aio_pool
from celery import signals

class CustomAsyncIOPool(aio_pool.pool.AsyncIOPool):
    ...

app = Celery('ai_tasks',
                      broker=os.envirion['CELERY_BROKER'],
                      backend=os.envirion['CELERY_BACKEND'],
                      worker_pool=CustomAsyncIOPool)

def async_signal(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return aio_pool.pool.AsyncIOPool.run_in_pool(func, *args, **kwargs)
    return wrapper

@signals.worker_init.connect
@async_signal
async def worker_init_dispatch(*args, **kwargs):
    await asyncio.sleep(1)
    # Some other async initialization work here
    print('Done')

All other solutions were causing errors like: "attached to a different loop"