airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.37k stars 118 forks source link

Bug: publishing on RabbitBroker hangs forever when connection lost #1579

Closed sfran96 closed 1 month ago

sfran96 commented 2 months ago

Describe the bug If an application uses a RabbitBroker broker to publish messages, the application establishes the connection correctly but after the fact the connection is lost (e.g., broker goes offline) the publishing logic hangs for as long as the broker is offline and then attempts to publish all stuck messages at once.

How to reproduce Follow the instructions in the code snippet.

# example.py
"""
How to reproduce the issue:
1. Start the FastAPI app (`uvicorn example:fastapi_app --host 0.0.0.0 --port 8181`)
2. Stop the broker
3. Send a POST request to http://localhost:8181/run-job (`curl -X 'POST' 'http://localhost:8181/run-job'`) and watch it hang
"""
from contextlib import asynccontextmanager

from fastapi import FastAPI
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker(
    f"amqp://guest:guest@localhost:5672",
    max_consumers=1,
)

@asynccontextmanager
async def lifespan(app):
    await broker.start()
    yield
    await broker.close()

fastapi_app = FastAPI(
    lifespan=lifespan
)
faststream_app = FastStream(
    broker=broker
)

async def run_job(message):
    print(f"Message processed: {message}")

broker.subscriber("run_job")(run_job)

async def run_job_http_endpoint():
    await broker.publish(queue="run_job", message={"job": "run"})
    return {"message": "Job processed successfully"}

fastapi_app.post("/run-job")(run_job_http_endpoint)

Expected behavior When publishing, if the connection is in an invalid state, the operation fails fast raising some sort of exception.

Observed behavior When publishing, if the connections is lost after its established correctly, the operation hangs forever.

Environment

$ faststream -v
Running FastStream 0.5.11 with CPython 3.10.12 on Darwin

Additional context This looks like its linked to an open issue with aio-pika that could be the source of this one, because a check to the connection's state prior to publishing mitigates the issue, i.e., if broker._connection.connected.is_set() is False.

Lancetnik commented 2 months ago

@sfran96 thx for such detail report! Indeed, seems like the problem on aio-pika side: I reported Issue and pinged Mosquito already. Also I am planning to help him with a fix, when we discuss, how the problem should be fixed. Wait a bit for it, please

Lancetnik commented 1 month ago

Should be closed with tomorrow aio-pika release

gaby commented 1 month ago

AioPika fix was released today v9.4.3