faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Can't use FastAPI + Faust-streaming version above 0.10.1 - attached to a different loop #448

Open harochau opened 1 year ago

harochau commented 1 year ago

Thanks for all the people continuing to support this project, unfortunately I run into a critical bug in my system:

Checklist

Steps to reproduce

  1. fastapi_app.py - FastAPI app started with uvicorn.
  2. faust_app.py - Faust app started with python faust_app.py worker -l info
  3. both processes running in the same docker container.
  4. Faust-streaming any version above 0.10.1 (checked 0.10.2, 0.10.3, 0.10.4)

Expected behavior

I can import a topic created in faust_app.py into fastapi_app.py and use await my_topic.send(value={...}) inside of FastAPI async endpoints (or use an async function that does the same). Everything works OK in faust-streaming version 0.10.1 and below.

Actual behavior

AssertionError: Please create objects with the same loop as running with or RuntimeError: Task attached to a different loop looks like the 1st attempt will result in the 1st error, and further attempts result in the second error.

Full traceback

in production k8s

....
  File "/app/my_service/fastapi_app.py", line 70, in send_my_message_endpoint
    await send_my_message(my_message)
  File "/app/my_service/faust_app.py", line 71, in send_my_message
    await my_message_topic.send(value=my_message.json())
  File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 189, in send
    return await self._send_now(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/channels.py", line 314, in _send_now
    return await self.publish_message(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 429, in publish_message
    await producer.send(
  File "/usr/local/lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 1360, in send
    await transaction_producer.send(
  File "/usr/local/lib/python3.11/site-packages/aiokafka/producer/producer.py", line 442, in send
    await self.client._wait_on_metadata(topic)
  File "/usr/local/lib/python3.11/site-packages/aiokafka/client.py", line 639, in _wait_on_metadata
    await self.force_metadata_update()
RuntimeError: Task <Task pending name='Task-50' coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py:407> cb=[set.discard()]> got Future <Future pending cb=[shield.<locals>._outer_done_callback() at /usr/local/lib/python3.11/asyncio/tasks.py:898]> attached to a different loop

in local docker-compose

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 407, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.11/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 103, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 101, in __call__
    await self.app(scope, receive, send_wrapper)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 92, in __call__
    await self.simple_response(scope, receive, send, request_headers=headers)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 147, in simple_response
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
               ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 235, in app
    raw_response = await run_endpoint_function(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 161, in run_endpoint_function
    return await dependant.call(**values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/my_service/fastapi_app.py", line 70, in send_my_message_endpoint
    await send_my_message(my_message)
  File "/app/my_service/faust_app.py", line 71, in send_my_message
    await my_message_topic.send(value=my_message.json())
  File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 189, in send
    return await self._send_now(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/channels.py", line 314, in _send_now
    return await self.publish_message(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 407, in publish_message
    producer = await self._get_producer()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/topics.py", line 384, in _get_producer
    return await self.app.maybe_start_producer()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/mode/utils/futures.py", line 58, in __call__
    result = await self.fun(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/faust/app/base.py", line 1569, in maybe_start_producer
    await producer.maybe_start()
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 859, in maybe_start
    await self.start()
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 800, in start
    await self._default_start()
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 807, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.11/site-packages/mode/services.py", line 824, in _actually_start
    await self.on_start()
  File "/usr/local/lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 1300, in on_start
    await producer.start()
  File "/usr/local/lib/python3.11/site-packages/aiokafka/producer/producer.py", line 303, in start
    assert self._loop is get_running_loop(), (
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: Please create objects with the same loop as running with

Versions

patkivikram commented 1 year ago

faust-streaming has aiohttp embedded in it and you should use that instead of fast-api

harochau commented 1 year ago

aiohttp

  1. Before I could use any web framework before, and it worked.
  2. Aiohttp is nowhere as good as FastAPI. Faust probably should ditch faust.web (aiohttp) anyway and let users plug-in any framework they like

I have Faust services where I only use built-in aiohttp for web interface, but it is cumbersome at best.

harochau commented 1 year ago

After some instigation, I was able to make my services work with 0.10.2, 0.10.3, 0.10.4 versions I need to start my fastapi app from command line or from separate python file like run.py, not from fastapi_app.py Moving the code snippet blow from fastapi_app.py to separate python file like run.py and changing entry point seem to fix the issue. Probably it's because of double import and double creation of faust app/kafka producer.

if __name__ == "__main__":
    import uvicorn

    uvicorn.run('fastapi_app:app', host=..., port=..., reload=True)

I guess the changes from 0.10.2 should be at least a minor patch, as it breaks some workflows.

wbarnha commented 11 months ago

Thanks for pursuing this, I'll take a stab at expanding upon a better solution in https://github.com/faust-streaming/faust/blob/master/examples/fastapi_example.py.

I do think that we should explore replacing aiohttp with fastapi. It's just a matter of finding the time to do so...