snok / asgi-correlation-id

Request ID propagation for ASGI apps
MIT License
413 stars 32 forks source link

ID generator for celery extension #51

Closed dapryor closed 2 years ago

dapryor commented 2 years ago

Refer to #50

Please review and I can address any requested changes :)

Added customizable generator options to celery extension to better match capabilities of the correlation id middleware

sondrelg commented 2 years ago

I think this looks good :+1:

Would you mind adding one or two tests, and would you mind updating the .github/workflows/tests.yml final line to say if: matrix.python-version == '3.10'.6? A bit pedantic perhaps, but nice to maintain the 100% :man_shrugging:

dapryor commented 2 years ago

I am actually going to rethink this PR a bit. We have access to celery and we just realized we can't correlate between the actual task id and the id that these functions create.

I'm still investigating, but it looks like the actual task id comes in through the kwargs of

@task_prerun.connect(weak=False)
def worker_prerun(task: 'Task', **kwargs: Any) -> None:
    ...

I am going to see if we can either provide a generator to match how it is currently done or use the default task id so we can correlate with flower.

dapryor commented 2 years ago

After investigating further, it is possible to get the task_id from the task_prerun hook.

task_prerun = Signal(
    name='task_prerun',
    providing_args={'task_id', 'task', 'args', 'kwargs'},
)

I am going to add this as an opt-in parameter so as not to break people's existing code.

sondrelg commented 2 years ago

Not sure how the current implementation would break existing code, but sounds good. Let me know when it's ready for another look :+1:

dapryor commented 2 years ago

What is currently in this pr should not break existing code since I defaulted the new arguments with the values that were hardcoded.

The addition I will be making that lets it use the celery task id will also not break existing code since I will make it Optional and opt-in.

I will hopefully get to updating this this week.

sondrelg commented 2 years ago

Sounds good @dapryor. I just had one quick unrelated question:

Are you running celery alongside an async framework like starlette or fastapi?

I'm only asking, because we did the same thing for a while, thinking we could do some decorator shenanigans to spin up new event loop for each task, and get around the celery sync limitation (celery doesn't have async support yet, right?). The other option was having multiple sync/async dependencies for each resource we needed to interact with (we want to make async calls to postgres, but celery can't use the same handler when it needs to interact with the database right :face_exhaling:), but that seemed even worse. After a lot of effort trying to get it to work, I set aside an afternoon to get rid of celery and migrate to an async task framework and we immediately had a hundred issues disappear. It also is just a lot more resource efficient.

Not saying this to dissuade you from anything, just wanted to share our experience in case you're in a similar situation.

In case you're interested, we migrated to arq which works well, then to saq, and I'd recommend both of those :+1:

JonasKs commented 2 years ago

+1 @sondrelg on arq! Currently debating to migrate to saq too.

dapryor commented 2 years ago

@sondrelg We are using fastapi and celery in our stack. We are relatively early in development, but have not had any issues yet. I'd be interested in seeing your original issues in depth if there is a public thread with the discussion :)

I will also give arq and saq a look!

sondrelg commented 2 years ago

All private unfortunately, but it boiled down to one primary issue: for everything we build async, we would need a sync equivalent if we wanted to run it using Celery.

This applied to:

Say you have a payment system you want to interact with. When a request comes into one of your API endpoint, you want to forward some data to the payments system. Most of the time, in these cases, you will build a client to interface with the payment system, and you really want your client to be async (I'm just taking it for granted that you agree running blocking remote HTTP calls from your async web server defeats the purpose of choosing an async framework). The problem arises when you want to create a celery task to interface with the same system. Now you need a sync client... Basically everything gets duplicated, and that seems like a bad situation to be in, if it can be avoided.

After coming to that conclusion, we thought "we can just write async tasks"! We'll create async tasks, and we'll just make celery run them in their own event loops. We came up with ~this:

def async_celery_task(func: Callable) -> Task:
    async def setup() -> None:
        await setup_redis()
        await connect_db()

    async def teardown() -> None:
        await disconnect_db()
        await close_redis()

    @wraps(func)
    def inner(self: Task, *args: Any, **kwargs: Any) -> Any:
        async def async_task() -> Any:
            await setup()

            # Check whether the task we're about to execute expects a "self" parameter
            params = {'self': self} if 'self' in signature(func).parameters else {}

            # Check whether the task is sync or async before executing it
            try:
                if asyncio.iscoroutinefunction(func):
                    await func(*args, **(params | kwargs))
                else:
                    func(*args, **(params | kwargs))
            except Exception:
                raise
            finally:
                await teardown()

        running_async_to_sync.set(True)
        async_to_sync(async_task)()
        running_async_to_sync.set(False)

    return celery.task(bind=True, queue=CeleryQueue.MY_QUEUE)(inner)

@async_celery_task
async def my_task():
    asyncio.sleep(1)

And it was a terrible idea :boom: :smile:

You end up having to spin up an event loop, and re-initialize connections (or more likely connection pools) with your db, cache, etc.. All of that is horribly expensive computationally, and your payoff is that you get to only run a single async task at the time, which sucks! :laughing:

Again, your situation might be entirely different, and my implementation above might be flawed, so don't mean to say it's a bad idea. Just thought I would relay my experience in case it might save you some suffering. @JonasKs how many async tasks are you running at the same time in your workers again?

JonasKs commented 2 years ago

Agree with Sondre, Celery works great as a library, it just sucks that you can't re-use async code.

We process ~20000 tasks a day with arq on one single worker container. All tasks are I/O heavy. At the same time I'm not sure, but during testing I did 500 without issues.

sondrelg commented 2 years ago

@dapryor are you still working on this? :slightly_smiling_face:

dapryor commented 2 years ago

@sondrelg Yes :) Apologies. I have been busy with work. Will get back to this soon

dapryor commented 2 years ago

@sondrelg I have updated the PR. It includes the signature change from #52 and the extra parameter for allowing the internal celery task id to be used.

I honestly would like some help with the celery test since the current structure of those test is async with a session celery app. This doesn't seem to lend itself well to test that change signal functions of the session app. Plus celery testing is already a huge pain...

sondrelg commented 2 years ago

Looks good, thanks @dapryor 👏

sondrelg commented 2 years ago

The new version (3.1.0) is now release. Would you mind verifying that it works ok for you @dapryor?

davidpryor commented 1 year ago

@sondrelg everything seems to be working! will open an issue if i hit anything.