airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.12k stars 99 forks source link

Bug: Inconsistent bugs while using protobuf with decoder #1558

Open moono opened 6 days ago

moono commented 6 days ago

Describe the bug I'm trying to use rabbitmq as message queue and protobuf as message container. I was following examples in the doc

When I use decoder with other contexts (dependency) like Logger, then it occasionally works and sometimes it doesn't. Very weird...

How to reproduce Include source code:

from faststream import FastStream, NoCast, Logger
from faststream.rabbit import RabbitBroker, RabbitMessage

from data_pb2 import Inputs

broker = RabbitBroker("amqp://guest:guest@<some-rabbirmq-url>:5672/")
app = FastStream(broker)

def generate_data(user_id: int, user_name: str, fn: str) -> bytes:
    container = Inputs()
    container.id = user_id
    container.name = user_name
    container.image = open(fn, "rb").read()
    return container.SerializeToString()

@app.after_startup
async def test_publish():
    fn = "./1111.png"
    message = generate_data(1001, "John Doe", fn)
    await broker.publish(message, queue="in-queue")

async def decode_message(msg: RabbitMessage) -> Inputs:
    decoded = Inputs()
    decoded.ParseFromString(msg.body)
    return decoded

@broker.subscriber("in-queue", decoder=decode_message)
async def handle_msg(body: NoCast[Inputs], logger: Logger) -> str:
    logger.info(f"User: {body.name} - {body.id} registered")
    return f"User: {body.name} - {body.id} registered"

...

And/Or steps to reproduce the behavior:

  1. ...

Expected behavior I should always work?

Observed behavior Sometimes it works as expected

2024-06-27 08:38:29,998 INFO     - FastStream app starting...
2024-06-27 08:38:30,005 INFO     -      | in-queue |            - `HandleMsg` waiting for messages
2024-06-27 08:38:30,011 INFO     - FastStream app started successfully! To exit, press CTRL+C
2024-06-27 08:38:30,011 INFO     -      | in-queue | a7907fd64f - Received
2024-06-27 08:38:30,012 INFO     -      | in-queue | a7907fd64f - User: John Doe - 1001 registered
2024-06-27 08:38:30,013 INFO     -      | in-queue | a7907fd64f - Processed

Somtime it doesn't and throws pydantic validation error

2024-06-27 08:39:02,272 INFO     - FastStream app starting...
2024-06-27 08:39:02,280 INFO     -      | in-queue |            - `HandleMsg` waiting for messages
2024-06-27 08:39:02,285 INFO     - FastStream app started successfully! To exit, press CTRL+C
2024-06-27 08:39:02,286 INFO     -      | in-queue | baae69e4dd - Received
2024-06-27 08:39:02,287 ERROR    -      | in-queue | baae69e4dd - ValidationError: 1 validation error for handle_msg
body
  Field required [type=missing, input_value={'logger': <Logger fastst...m.access.rabbit (INFO)>}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.7/v/missing
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/faststream/broker/subscriber/usecase.py", line 338, in consume
    await h.call(
  File "/usr/local/lib/python3.11/dist-packages/faststream/broker/subscriber/call_item.py", line 172, in call
    raise e
  File "/usr/local/lib/python3.11/dist-packages/faststream/broker/subscriber/call_item.py", line 164, in call
    result = await call(message)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/faststream/broker/middlewares/base.py", line 73, in consume_scope
    await self.after_consume(err)
  File "/usr/local/lib/python3.11/dist-packages/faststream/broker/middlewares/base.py", line 54, in after_consume
    raise err
  File "/usr/local/lib/python3.11/dist-packages/faststream/broker/middlewares/base.py", line 64, in consume_scope
    result = await call_next(await self.on_consume(msg))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/faststream/broker/wrapper/call.py", line 201, in decode_wrapper
    return await func(msg)
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/fast_depends/use.py", line 148, in injected_wrapper
    r = await real_model.asolve(
        ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/fast_depends/core/model.py", line 520, in asolve
    final_args, final_kwargs, call = cast_gen.send(kwargs)
                                     ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/fast_depends/core/model.py", line 275, in _solve
    casted_model = self.model(**solved_kw)
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pydantic/main.py", line 176, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
pydantic_core._pydantic_core.ValidationError: 1 validation error for handle_msg
body
  Field required [type=missing, input_value={'logger': <Logger fastst...m.access.rabbit (INFO)>}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.7/v/missing
2024-06-27 08:39:02,289 INFO     -      | in-queue | baae69e4dd - Processed

Screenshots

Environment Running FastStream 0.5.12 with CPython 3.11.9 on Linux

Additional context To reproduce, I used following protobuf file

syntax = "proto3";

package faststream.rabbitmq;

message Inputs {
  sint32 id = 1;
  string name = 2;
  bytes image = 3;
}