NicolasLM / spinach

Modern Redis task queue for Python 3
https://spinach.readthedocs.io
BSD 2-Clause "Simplified" License
63 stars 4 forks source link

Running asyncio tasks in the event loop of the main thread #9

Closed lqmanh closed 2 years ago

lqmanh commented 3 years ago

Hi,

It isn't stated anywhere in the docs so I wonder if it is possible to use a coroutine function as a Task. Something like this:

@spin.task(name='compute')
async def compute(a, b):
    ...

If it isn't possible yet, could you please consider supporting it?

NicolasLM commented 3 years ago

Yes, it is currently not supported, but I definitely have this feature in the back of my mind.

To start thinking about a design, it would be very helpful if you could describe what you are trying to achieve, why...

lqmanh commented 3 years ago

I'm using Spinach in my web app. My tech stack includes FastAPI, Tortoise ORM, HTTPX,... These all support async/await out of the box and I also use this syntax quite extensively. Therefore, it may be a good idea if I can use async/await in Spinach Task function like my example above, without any extra work.

Currently I have to use this kind of workaround:

def coro_wrapper(f: Callable):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        return asyncio.run(f(*args, **kwargs))

    return wrapper

@spin.task(name='compute')
@coro_wrapper
async def compute(a, b):
    ...
juledwar commented 3 years ago

It seems like it would be trivial to define another task that calls this async function and waits for it. I'm not sure what you gain by making the task itself async.

NicolasLM commented 3 years ago

The snipped provided describes the workaround that had to be implemented because Spinach does not support async workers. I imagine the end goal is to have a single thread running N async jobs at once.

The Workers class which is responsible for running jobs only interfaces with the Engine though queues so it doesn't seem too complicated to add an AsyncioWorkers that would natively run coroutines.

lqmanh commented 3 years ago

@juledwar, Indeed it's trivial to use a workaround. However, built-in async Task support may implies some potential optimizations.

In my workaround above, creating and destroying event loops repeatedly with asyncio.run is not that performant.

NicolasLM commented 3 years ago

FYI, I made a proof of concept yesterday of an AsyncioWorkers class that spawns a single thread running all async tasks. It uses janus as an interface between the sync Engine and the async workers.

The main drawback of this approach is that the scheduling of tasks is still blocking, so async code should use asyncio.to_thread(spin.schedule, task) to schedule tasks without blocking the event loop.

Since the implementation is quite simple and asyncio is now a first class citizen in the Python ecosystem I think it makes sense to support this.

lqmanh commented 3 years ago

@NicolasLM I have this question. I'm trying to run Spinach in-process. In tasks I need to use an async library which implicitly uses the main event loop (web app event loop). Whenever Spinach tries to execute a job, "Future attached to a different loop" error is raised. I'm thinking of asyncio.run_coroutine_threadsafe but currently have no idea how to get that main event loop in worker threads.

Could you help me with this? Thanks in advance.

NicolasLM commented 3 years ago

I just pushed my work-in-progress implementing native asyncio workers: https://github.com/NicolasLM/spinach/tree/asyncio-workers

It is not ready for prime time yet, but it works well. Let me know if that solves your issue.

lqmanh commented 3 years ago

@NicolasLM Thanks for quick feedback, I'll try it out.

lqmanh commented 3 years ago

I've tried out AsyncioWorkers and the error "Future attached to a different loop" still occurred. Skimmed through the code, I found AsyncioWorkers still spawns a separate thread for Janus, and launches a new event loop there (via asyncio.run). Is it sensible to run Janus directly in the main thread? In my case, the web app ensures a running event loop for Janus already. In other cases, maybe we just need to let users manage the event loop themselves?

NicolasLM commented 3 years ago

In tasks I need to use an async library which implicitly uses the main event loop (web app event loop). Whenever Spinach tries to execute a job, "Future attached to a different loop" error is raised. I'm thinking of asyncio.run_coroutine_threadsafe but currently have no idea how to get that main event loop in worker threads.

My understanding of asyncio is that each thread running async code needs its own event loop that cannot be shared between threads. Most code creates a default event loop if the current thread does not run one yet. This pattern should be compatible with the AsyncioWorkers. While I think that Janus is a bit too complicated for our needs, I don't think that your problem is related to Janus.

lqmanh commented 3 years ago

My understanding of asyncio is that each thread running async code needs its own event loop that cannot be shared between threads.

For this reason, I don't know if it's possible and sensible for AsyncWorkers to run directly in the main thread (now in branch asyncio-workers it creates another thread and runs a new event loop there).

NicolasLM commented 3 years ago

As it stands I don't think it's possible to use Spinach in a way that uses the main thread to launch asyncio tasks. But this shouldn't matter as I don't think that asyncio treats the main thread in a special way.

Maybe you can write a minimal example showing the problem you have/are trying to solve?

lqmanh commented 3 years ago

Here's my problem

Not working

async def foo():
    # do complicated things
    loop = asyncio.get_running_loop()  # event loop of the main thread
    # do complicated things

@spin.task(name='bar1')
def bar1():
    asyncio.run(foo())  # won't work, as this is run in a worker thread, uses the event loop of the worker thread

@spin.task(name='bar2')
async def bar2():
    await foo()  # AsyncioWorkers won't help either, as this is run in a separate thread too, the one with Janus queues

Solution 1

def get_main_thread_event_loop():
    # somehow get the event loop of the main thread, I still cannot work it out yet

@spin.task(name='bar')
def bar():
    asyncio.run_coroutine_threadsafe(foo(), loop=get_main_thread_event_loop())

Solution 2

AsyncioWorkers run in the main thread?

NicolasLM commented 3 years ago

Tell me if I'm understanding correctly what you are trying to achieve: you have a process that already runs asyncio code, let's call it webapp, and you want to add a spinach worker to the process. You also want the webapp and the spinach tasks to somehow interact with each other using the function foo.

lqmanh commented 3 years ago

Yep that's correct.

NicolasLM commented 3 years ago

In this case I don't have a good answer for you, sorry. The current implementation of the asyncio worker requires to run all tasks in a separate thread, and thus in an event loop that is different from the main thread's.

Keep in mind that most people run Spinach workers in processes that do nothing else than running tasks. So even if the current implementation of AsyncioWorkers does not work for your use case, I think it still makes sense as a general way of running asyncio tasks within Spinach.

NicolasLM commented 2 years ago

Closing this issue because this likely won't be supported. Feel free to reopen it if there is a new use case for this feature.