airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.18k stars 162 forks source link

Bug: FastAPI throws PydanticUndefinedAnnotation error when starting the broker in FastAPI lifespan #1920

Closed tcmRyan closed 1 week ago

tcmRyan commented 1 week ago

Describe the bug After creating a RabbitRouter and calling broker.start() inside the FastAPI lifespan, Faststream throws an import error in faststream/broker/fastapi/get_dependant.py", line 32, in get_fastapi_native_dependant

File "/app/.venv/lib/python3.12/site-packages/pydantic/_internal/_generate_schema.py", line 866, in _resolve_forward_ref
    raise PydanticUndefinedAnnotation.from_name_error(e) from e
 pydantic.errors.PydanticUndefinedAnnotation: name 'AnyDict' is not defined

How to reproduce Include source code:

    @asynccontextmanager
    async def lifespan(app: FastAPI):

        await rabbit_router.broker.start()

        yield

        rabbit_router.broker.close()

    app = FastAPI(lifespan=lifespan)

And/Or steps to reproduce the behavior:

  1. ...

Expected behavior Fast API would not throw a Pydantic Error when starting the broker.

Observed behavior Fast API throws a PydanticUndefinedAnnotation error

Screenshots If applicable, attach screenshots to help illustrate the problem.

Environment Include the output of the faststream -v command to display your current project and system environment.

Additional context Provide any other relevant context or information about the problem here.

Lancetnik commented 1 week ago

@tcmRyan sorry, but I don't think, that the problem is in lifespan - FastStream doesn't analize this method

Also, current snippet doesn't reproduce the problem

from contextlib import asynccontextmanager

from fastapi import FastAPI

from faststream.rabbit.fastapi import RabbitRouter

rabbit_router = RabbitRouter()

@asynccontextmanager
async def lifespan(app: FastAPI):
    await rabbit_router.broker.start()
    yield
    rabbit_router.broker.close()

app = FastAPI(lifespan=lifespan)
app.include_router(rabbit_router)

Can you try to localize the problem and share MRE with me?

tcmRyan commented 1 week ago

Closing the issue. Looks like this was because I had an issue in a decorator I have elsewhere to do custom ack/nack

Lancetnik commented 1 week ago

@tcmRyan did you solve your problem?

tcmRyan commented 1 week ago

TLDR; Yup, just had to delete some code, which always feels nice.

I wrote a wrapper around Faststream so that I can use rabbit with Fanout exchanges similar to the Blinker library. My issue was when I tried to leave space for custom ack/hack work, but since Im not doing anything the Faststream doesn't already do, Im just going to punt on the problem.

This was the code causing the issue:

    def subscribe(self, subscriber, broker):
        broker.subscriber(subscriber.queue, self.exchange)(self.handler(subscriber.fn))

    @staticmethod
    def handler(fn):
        async def base_handler(body: Any, msg: RabbitMessage):
            try:
                await fn(body)
                await msg.ack()
            except Exception:
                await msg.nack()
        return base_handler