airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.14k stars 101 forks source link

Bug:Channel is not accessible when setup_state=False #989

Closed kgadek93 closed 7 months ago

kgadek93 commented 7 months ago

Describe the bug

Channel is not accessible when setup_state=False. When i try to publish message i see this error: "RabbitBroker channel is not started yet". Here is full traceback:

Traceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py\", line 98, in receive\n    return self.receive_nowait()\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py\", line 93, in receive_nowait\n    raise WouldBlock\nanyio.WouldBlock\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 78, in call_next\n    message = await recv_stream.receive()\n              ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py\", line 118, in receive\n    raise EndOfStream\nanyio.EndOfStream\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/code/src/main_fastapi.py\", line 87, in http_middleware\n    response = await call_next(request)\n               ^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 84, in call_next\n    raise app_exc\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 70, in coro\n    await self.app(scope, receive_or_disconnect, send_no_error)\n  File \"/usr/local/lib/python3.11/site-packages/starlette_context/middleware/raw_middleware.py\", line 92, in __call__\n    await self.app(scope, receive, send_wrapper)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 108, in __call__\n    response = await self.dispatch_func(request, call_next)\n               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/middleware/add_spans_attrs_middleware.py\", line 37, in dispatch\n    response = await call_next(request)\n               ^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 84, in call_next\n    raise app_exc\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py\", line 70, in coro\n    await self.app(scope, receive_or_disconnect, send_no_error)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py\", line 79, in __call__\n    raise exc\n  File \"/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py\", line 68, in __call__\n    await self.app(scope, receive, sender)\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py\", line 20, in __call__\n    raise e\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py\", line 17, in __call__\n    await self.app(scope, receive, send)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 718, in __call__\n    await route.handle(scope, receive, send)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 276, in handle\n    await self.app(scope, receive, send)\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 66, in app\n    response = await func(request)\n               ^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/routing.py\", line 273, in app\n    raw_response = await run_endpoint_function(\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/fastapi/routing.py\", line 190, in run_endpoint_function\n    return await dependant.call(**values)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/woflow/router.py\", line 47, in create_woflow_import\n    await rabbit_broker.publish(\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 469, in publish\n    assert self._producer, \"RabbitBroker channel is not started yet\"  # nosec B101\n    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nAssertionError: RabbitBroker channel is not started yet"

How to reproduce Include source code:

from faststream import FastStream
app = FastAPI(
    title="my-service",
    version=settings.RELEASE_VERSION,
)

router = RabbitRouter(
        f"amqp://{settings.RABBITMQ.USER}:{settings.RABBITMQ.PASS}@{settings.RABBITMQ.HOST}:{settings.RABBITMQ.PORT}",
        include_in_schema=True,
        setup_state=False,
    )
app.include_router(router, prefix=BASE_URL, tags=["router"])

@router.post(
    f"{BASE_URL}/",
    summary="Create a new message",
    response_description=NO_CONTENT_RESPONSE_DESCRIPTION,
    status_code=status.HTTP_200_OK,
)
async def create_msg(
    rabbit_broker: Annotated[RabbitBroker, Depends(broker)],
    request: Request,
):  # type: ignore
    await rabbit_broker.publish(
        ImportMessage(
            msg="msg"
        ),
        queue=QUEUE_NAME,
        persist=True,
    )
    return "OK"

...

And/Or steps to reproduce the behavior:

  1. ...

Expected behavior Explain what you expected to happen clearly and concisely. Message should be added successfully to queue. Observed behavior Message wasn't added into the queue and it's raising an error. Screenshots If applicable, attach screenshots to help illustrate the problem.

Environment Include the output of the faststream -v command to display your current project and system environment. I'm using faststream so i run server like that: uvicorn main_fastapi:app --host 0.0.0.0 --reload --port 9080

Additional context Provide any other relevant context or information about the problem here.

Lancetnik commented 7 months ago

@kgadek93 you just forgot to setup your router with FastAPI lifespan. It is an important part (and, unfortunatelly, required one) until tiangolo doesn't merge this PR

Correct code shoud looks like:

from fastapi import FastAPI
from faststream.rabbit.fastapi import RabbitRouter

router = RabbitRouter(...)
app = FastAPI(..., lifespan=router.lifespan_context)  # you forgot about this one
app.include_router(router, ...)
kgadek93 commented 6 months ago

Hi, @Lancetnik I've check that and I've added lifespan like this:

app = FastAPI(
    title="my-service",
    middleware=middlewares,
    version=settings.RELEASE_VERSION,
    dependencies=[Depends(initialize_context)],
    lifespan=Lifespans([database_lifespan, import_router.lifespan_context]),
)

but I still see this issue:

fastapi_1                | {"caller": "/usr/local/lib/python3.11/site-packages/uvicorn/lifespan/on.py:121", "ts": "2024-01-04T13:58:24.317656+00:00", "level": "ERROR", "message": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 455, in connect\n    reader, writer = await asyncio.open_connection(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/streams.py\", line 48, in open_connection\n    transport, _ = await loop.create_connection(\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"uvloop/loop.pyx\", line 1978, in create_connection\nsocket.gaierror: [Errno -5] No address associated with hostname\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 677, in lifespan\n    async with self.lifespan_context(app) as maybe_state:\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/config/lifespans.py\", line 30, in _lifespan_manager\n    await exit_stack.enter_async_context(lifespan(app))\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 650, in enter_async_context\n    result = await _enter(cm)\n             ^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/fastapi/router.py\", line 334, in start_broker_lifespan\n    await self.broker.start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 287, in start\n    await super().start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 89, in start\n    await self.connect()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 204, in connect\n    connection = await super().connect(*args, **kwargs)\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 320, in connect\n    self._connection = await self._connect(**_kwargs)\n                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 233, in _connect\n    await aio_pika.connect_robust(\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 338, in connect_robust\n    await connection.connect(timeout=timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 184, in connect\n    await self.__fail_fast_future\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 139, in __connection_factory\n    await Connection.connect(self, self.__connect_timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/connection.py\", line 118, in connect\n    self.transport = await UnderlayConnection.connect(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 666, in connect\n    connection = await cls.make_connection(\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 654, in make_connection\n    connection: aiormq.abc.AbstractConnection = await asyncio.wait_for(\n                                                ^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/tasks.py\", line 452, in wait_for\n    return await fut\n           ^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 918, in connect\n    await connection.connect(client_properties or {})\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/base.py\", line 164, in wrap\n    return await self.create_task(func(self, *args, **kwargs))\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/abc.py\", line 44, in __inner\n    return await self.task\n           ^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 462, in connect\n    raise AMQPConnectionError(*e.args) from e\naiormq.exceptions.AMQPConnectionError: [Errno -5] No address associated with hostname\n", "error": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 455, in connect\n    reader, writer = await asyncio.open_connection(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/streams.py\", line 48, in open_connection\n    transport, _ = await loop.create_connection(\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"uvloop/loop.pyx\", line 1978, in create_connection\nsocket.gaierror: [Errno -5] No address associated with hostname\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/starlette/routing.py\", line 677, in lifespan\n    async with self.lifespan_context(app) as maybe_state:\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/code/src/config/lifespans.py\", line 30, in _lifespan_manager\n    await exit_stack.enter_async_context(lifespan(app))\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 650, in enter_async_context\n    result = await _enter(cm)\n             ^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/contextlib.py\", line 210, in __aenter__\n    return await anext(self.gen)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/fastapi/router.py\", line 334, in start_broker_lifespan\n    await self.broker.start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 287, in start\n    await super().start()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 89, in start\n    await self.connect()\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 204, in connect\n    connection = await super().connect(*args, **kwargs)\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py\", line 320, in connect\n    self._connection = await self._connect(**_kwargs)\n                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/faststream/rabbit/broker.py\", line 233, in _connect\n    await aio_pika.connect_robust(\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 338, in connect_robust\n    await connection.connect(timeout=timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 184, in connect\n    await self.__fail_fast_future\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/robust_connection.py\", line 139, in __connection_factory\n    await Connection.connect(self, self.__connect_timeout)\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/connection.py\", line 118, in connect\n    self.transport = await UnderlayConnection.connect(\n                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 666, in connect\n    connection = await cls.make_connection(\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aio_pika/abc.py\", line 654, in make_connection\n    connection: aiormq.abc.AbstractConnection = await asyncio.wait_for(\n                                                ^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/asyncio/tasks.py\", line 452, in wait_for\n    return await fut\n           ^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 918, in connect\n    await connection.connect(client_properties or {})\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/base.py\", line 164, in wrap\n    return await self.create_task(func(self, *args, **kwargs))\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/abc.py\", line 44, in __inner\n    return await self.task\n           ^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/aiormq/connection.py\", line 462, in connect\n    raise AMQPConnectionError(*e.args) from e\naiormq.exceptions.AMQPConnectionError: [Errno -5] No address associated with hostname\n", "data": {"queue": "", "exchange": "", "message_id": ""}}
fastapi_1                | {"caller": "/usr/local/lib/python3.11/site-packages/uvicorn/lifespan/on.py:57", "ts": "2024-01-04T13:58:24.317850+00:00", "level": "ERROR", "message": "Application startup failed. Exiting.", "error": "Application startup failed. Exiting.", "data": {"queue": "", "exchange": "", "message_id": ""}}
Lancetnik commented 6 months ago

@kgadek93

No address associated with hostname

Read the error message, please. You are trying to connect to not-existed RMQ instance

kgadek93 commented 6 months ago

hi, @Lancetnik perhaps I have formulated the problem incorrectly... I would like to setup rabbitmq when app is starting, but if sth is wrong E.g RMQ doesn't exist I would like to log this and start app anyway. Rabbitmq is not crucial for my application. It there any param in lifespan_context for that?

Lancetnik commented 6 months ago

Nope, but you can create your custom lifespan and start broker manually to reach that behavior

kgadek93 commented 6 months ago

ok @Lancetnik I get it, thank you for your reply