Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.23k stars 105 forks source link

BUG: Support synchronous libraries in tasks when executing in process or thread #168

Open tekumara opened 1 year ago

tekumara commented 1 year ago

Describe the bug

Tasks run on the async event loop even when execution="process".

To Reproduce

import asyncio
from rocketry import Rocketry
from rocketry.conds import hourly

app = Rocketry()

@app.task(hourly, execution="process")
def do_hourly():
    print(asyncio.get_running_loop())

if __name__ == "__main__":
    app.run()

Output:

<_UnixSelectorEventLoop running=True closed=False debug=False>

Some synchronously libraries assert there is no event loop and won't run, eg: Playwright in rocketry:

INFO:rocketry.scheduler:Starting up...
INFO:rocketry.scheduler:Startup complete.
INFO:rocketry.task:Task 'do_daily' status: 'run'
ERROR:rocketry.task:Task 'do_daily' status: 'fail'
Traceback (most recent call last):
  File "/Users/tekumara/code/example/.venv/lib/python3.9/site-packages/rocketry/core/task.py", line 597, in _run_as_async
    output = await self.execute(**params)
  File "/Users/tekumara/code/example/.venv/lib/python3.9/site-packages/rocketry/tasks/func.py", line 230, in execute
    output = func(**params)
  File "/Users/tekumara/code/example/webglimpse/scheduler.py", line 22, in do_daily
    webglimpse.browse.screenshot(
  File "/Users/tekumara/code/example/webglimpse/browse.py", line 10, in screenshot
    with sync_playwright() as p:
  File "/Users/tekumara/code/example/.venv/lib/python3.9/site-packages/playwright/sync_api/_context_manager.py", line 44, in __enter__
    raise Error(
playwright._impl._api_types.Error: It looks like you are using Playwright Sync API inside the asyncio loop.
Please use the Async API instead.

Expected behavior

When running a task in a process or thread there is no event loop.

Additional context

rocketry 2.5.1

tekumara commented 1 year ago

A workaround for now is to use an async task, and to_thread from anyio to move sync methods to a thread, eg:


def sync_stuff():
    print(asyncio.get_running_loop()) # will throw RuntimeError: no running event loop
    ... do actual sync stuff here ...

@app.task(hourly, execution="process")
async def do_daily() -> None:
    await to_thread.run_sync(
        sync_stuff
    )
Miksus commented 1 year ago

Hmm, ye. This is how the async loop is created:

There is only one place where the await keyword is actually needed and it is here where the execution function is called: https://github.com/Miksus/rocketry/blob/214933f1873a7ab148a235222285dd650c263e5c/rocketry/core/task.py#L596-L599

So in principle, we could fix this by having two sets of those methods that actually carry on running the task function: one would be sync and another would be async. Then before we create the loop we check if we really need to create one. However, there is a risk that the method gets duplicated and becomes hard to maintain. Probably needs more splitting,

djnnvx commented 1 year ago

This example might be useful to you.