procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
840 stars 52 forks source link

Creating an asyncio loop in a sync task #1124

Closed ewjoachim closed 4 weeks ago

ewjoachim commented 1 month ago

One of my colleagues had a [...] problem. He didn't know that the worker runs under asyncio and used asyncio.run in a synchronous task to create a new event loop, leading to a similar error. The worker is calling sync_to_async with the default thread_sensitive set to True. I made a quick and dirty attempt, set thread_sensitive to False, and our error was gone (in the end, we rewrote the code of the task to be async). I wonder what the (negative) implications would be if we change the worker code to use thread_sensitive=False.

Originally posted by @medihack in https://github.com/procrastinate-org/procrastinate/issues/609#issuecomment-2245449737

ewjoachim commented 1 month ago

Asgiref says:

If thread_sensitive is passed, the code will run in the same thread as any outer code. This is needed for underlying Python code that is not threadsafe (for example, code which handles SQLite database connections).

So I believe it would be fine most of the time except when people would have used thread unsafe code (especially, code that wouldn't support being called from different threads).

So 1/ changing the default might break a very small number of use cases (I'd say), but it would make a very small number of users (who want to use run async code within sync tasks) easier. No idea if it would make more good than harm.

2/ might be worth looking at the doc seeing if we could make it more obvious in the docs what to do in this situation. Also, we could catch the error and make a specific error message.

medihack commented 1 month ago

Is 1/ really that problematic when the user code of a task runs isolated in its own thread? Maybe regarding a passed context!? Also, if I understand the Asgiref documentation correctly, there is nothing concurrently anymore when using a sync task and the concurrency option (>1) of the worker, as all fetched jobs run consecutively in the same thread. I am just thinking about how we could reach a more pleasant way to handle sync tasks. But yes, if there is no way to do that, we shall add something to the docs.

ewjoachim commented 1 month ago

I think (but I could be wrong) that even if in a simple case, passing thread_sensitive=True fixed the issue, I'm not sure there's a compelling reason that it will be completely fine, and it wont run into other issues down the line. I think it makes more sense to encourage people to declare async tasks or use sync_to_async rather than asyncio.run.

medihack commented 1 month ago

I think (but I could be wrong) that even if in a simple case, passing thread_sensitive=True fixed the issue

You meant thread_sensitive=False 😉. Sure, I will add a hint to the docs when I find some time.

ewjoachim commented 1 month ago

(yep sorry)

onlyann commented 1 month ago

Would there be issues within procrastinate itself if thread_sensitive=False was passed?

From my understanding, that should only impact user code. In that case, it may be reasonable to continue to default to thread_sensitive=True but offer the option to set it to False at the application (worker) level? Maybe even at the task level?

ewjoachim commented 1 month ago

If we want to do it, task level seems to be right.

To summarize:

If it happens that we find that even with thread_sensitive=False, we can get obscure failures without doing anything strange, will there be a benefit to setting this ?

onlyann commented 1 month ago

Maybe the best course of action at this time is an improved documentation and consider introducing it when there is demand for it?

medihack commented 1 month ago
  • I think it's best to let the user know as much as possible that there is an async loop running in the main thread and that they can use it for async tasks (> review and adjust the doc for that)

Yes, absolutely. But many people stop after reading the "Quickstart," where we use a sync task (which, thereby, seems like the recommended way).

  • if the user didn't read the doc and launches an event loop and it fails because there's already one running in the thread, it would be nice to catch this specific error and direct them to the doc

I am not sure if we can really fetch all associated errors. When, for example, the user creates a new thread in the sync task and starts a new event loop, no error is raised. But it will be raised as soon as sync_to_async is used in this new event loop (Single thread executor already being used, would deadlock). I know it sounds like an unlikely scenario, but it happened in our case 😂. I looked a bit around what other (non-asyncio) distributed task libraries do (like Celery or Dramatiq). In concurrency mode, most of those, by default, start each task in its own isolated process.

If it happens that we find that even with thread_sensitive=False, we can get obscure failures without doing anything strange, will there be a benefit to setting this ?

The thing I dislike about the current way we deal with sync tasks (besides the fact that the user can't create a new event loop if he wants to), is that we don't run them concurrently (when setting the concurrency option of the worker to something > 1), but sequentially in the main thread (if I understand it correctly).

ewjoachim commented 1 month ago

The thing I dislike about the current way we deal with sync tasks (besides the fact that the user can't create a new event loop if he wants to), is that we don't run them concurrently (when setting the concurrency option of the worker to something > 1), but sequentially in the main thread (if I understand it correctly).

Oh! And with thread_sensitive=False, different tasks are executed in their own thread and therefore (somewhat) concurrent? I had missed that bit. Of course, it makes much more sense if it's this way (let's test this). If so, I'm sold.

medihack commented 1 month ago

Oh! And with thread_sensitive=False, different tasks are executed in their own thread and therefore (somewhat) concurrent? I had missed that bit. Of course, it makes much more sense if it's this way (let's test this). If so, I'm sold.

From reading the docs and doing some quick experiments, it seems like this, but I could be wrong. In those experiments also, the above-mentioned Single thread executor already being used, would deadlock went away when using thread_sensititve=False as the sync_to_async methods are not nested anymore on the main thread.

If we choose to use thread_sensitive=False, then I wonder if we should get rid of sync_to_async in the worker at all and just create a new thread directly, or maybe even start a new process (or make it configurable like Huey does, of course only for sync tasks then).

medihack commented 1 month ago

I checked the problem by adding a simple demo task that uses a counter. Indeed, the sync task in concurrency mode is not handled concurrently (in contrast to the same async task).

@ewjoachim Somehow, I think we should get rid of sync_to_async and just create a new thread directly. I don't see any advantage of sync_to_async when using thread_sensitive=False anymore, but maybe I am missing something. On the other hand, we could just use thread_sensitive=False for now, switch later to a native thread, and introduce an option like --sync-worker-type to also allow the user to switch from thread to process for sync tasks (which is what I would prefer). What do you think?

@onlyann Should we tackle this during worker refactoring or in a separate PR?

onlyann commented 1 month ago

It would be preferable to keep it separate as the worker refactoring is already quite large in scope.

Something else that sync_to_async does it that it copies contextvars in the new thread. If thread_sensitive=False is the new default, then https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread can be considered. sync_to_async could be limited to Python 3.8 for the few extra months of support it has left.

Spawning separate processes is a lot more involved.

If the user code has the ability to define an async top level function for the task, is there something that stops them from doing what they want from that function (such as calling sync code in a separate thread, process, ...)?

medihack commented 1 month ago

Something else that sync_to_async does it that it copies contextvars in the new thread. If thread_sensitive=False is the new default, then https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread can be considered. sync_to_async could be limited to Python 3.8 for the few extra months of support it has left.

Spawning separate processes is a lot more involved.

If the user code has the ability to define an async top level function for the task, is there something that stops them from doing what they want from that function (such as calling sync code in a separate thread, process, ...)?

Sure, a user can do everything he wants with an async task. But if we say that we want to support sync tasks, then at least we should provide a sensible default. How about using thread_sensitive=False for now and maybe later switch to asyncio.to_thread when support for Python 3.8 is dropped? And leave spawning processes to the user.

ewjoachim commented 1 month ago

Ok for thread sensitive = False.

Dropping sync_to_async entirely: I wonder if that plays well with Django stuff. I think it likely should but before we switched to that, the sync/async situation was rather complicated, so...