robinhood / faust

Python Stream Processing
Other
6.7k stars 539 forks source link

Compatibility with asgiref.sync #689

Open fcurella opened 3 years ago

fcurella commented 3 years ago

Checklist

Steps to reproduce

I'd like to use the Django ORM to save some data into Postgres. Django recommends using asgiref's sync_to_async, but that relies on using async_to_sync above in the stack:

Note, though, that it relies on usage of async_to_sync() above it in the stack to correctly run things on the main thread.

I've tried wrapping worker.start() in async_to_sync(), but it looks like Faust is trying to register signal handlers on the wrong Thread, ie: the thread created by async_to_sync, rather than the main thread.

# models.py
from django.db import models

class Order(models.Model):
    details = models.TextField()
# agents.py
from asgiref.sync import sync_to_async

from app import app
from my_django_app.models import Order

topic = app.topic("my-topic")

@app.agents(topic)
async def process(messages):
    async for message in messages:
        await sync_to_async(Order.objects.create)(details="....")
#!/usr/bin/env python
# __main__.py
import asyncio

from asgiref.sync import async_to_sync

import faust
from faust.worker import Worker

def get_app():
    app = faust.App(
        "myapp",
        broker_url="kafka://localhost:",
        origin="myproject",
        autodiscover=True,
    )
    return app

def main() -> None:
    app = get_app()
    worker = Worker(app, loglevel="INFO")
    try:
        async_to_sync(worker.start)()
    finally:
        worker.stop_and_shutdown()

if __name__ == "__main__":
    main()

Expected behavior

Worker starts successfully.

Actual behavior

Starting up the worker fails with ValueError: set_wakeup_fd only works in main thread.

Full traceback

Traceback (most recent call last):
  File "/Users/flavio.curella/.pyenv/versions/3.8.3/lib/python3.8/asyncio/unix_events.py", line 95, in add_signal_handler
    signal.set_wakeup_fd(self._csock.fileno())
ValueError: set_wakeup_fd only works in main thread

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/flavio.curella/.pyenv/versions/3.8.3/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Users/flavio.curella/.pyenv/versions/3.8.3/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/Users/flavio.curella/code/mender/pipeline/pipeline/__main__.py", line 24, in <module>
    main()
  File "/Users/flavio.curella/code/mender/pipeline/pipeline/__main__.py", line 17, in main
    async_to_sync(worker.start)()
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/asgiref/sync.py", line 147, in __call__
    return call_result.result()
  File "/Users/flavio.curella/.pyenv/versions/3.8.3/lib/python3.8/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Users/flavio.curella/.pyenv/versions/3.8.3/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/asgiref/sync.py", line 212, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/mode/services.py", line 751, in _actually_start
    await self.on_first_start()
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/faust/worker.py", line 338, in on_first_start
    await self.default_on_first_start()
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/mode/worker.py", line 179, in default_on_first_start
    self.install_signal_handlers()
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/mode/worker.py", line 222, in install_signal_handlers
    self._install_signal_handlers_unix()
  File "/Users/flavio.curella/Envs/mender-pipeline/lib/python3.8/site-packages/mode/worker.py", line 228, in _install_signal_handlers_unix
    self.loop.add_signal_handler(signal.SIGINT, self._on_sigint)
  File "/Users/flavio.curella/.pyenv/versions/3.8.3/lib/python3.8/asyncio/unix_events.py", line 97, in add_signal_handler
    raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread

Versions

jurrian commented 1 year ago

I was having the same problem, eventually I solved it by using the app.loop, so you use the same loop as the app:

async def article_changed(article_id, is_new):
    future = await topic.send(value=ChangeRecord(id=article_id, is_new=is_new))
    return await future  # For some reason it does not work without awaiting twice

So from a model save you can trigger this async this way:

def save(self, *args, **kwargs):
    ...
    loop = app.loop
    coroutine = article_changed(article_id=self.id, is_new=is_new)
    loop.run_until_complete(coroutine)