airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.52k stars 128 forks source link

@producer decorator is not working on a function without @consumer decorator #496

Closed davorrunje closed 1 year ago

davorrunje commented 1 year ago

Why is this test failing?

import asyncio
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, TestKafkaBroker

class Data(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.publisher("input_data")
async def to_input_data(msg: Data) -> Data:
    # logger.info(f"to_input_data({msg=}, {logger=})")
    return Data(data=msg.data+5.0)

@broker.subscriber("input_data")
async def on_input_data(msg: Data, logger: Logger):
    logger.info(f"on_input_data({msg=}, {logger=})")

import pytest

@pytest.mark.asyncio
async def test_passing():
    async with TestKafkaBroker(broker) as tester_broker:

        await broker.publish(Data(data=5.2), "input_data")

        on_input_data.mock.assert_called()

@pytest.mark.asyncio
async def test_failing():
    async with TestKafkaBroker(broker) as tester_broker:

        await to_input_data(Data(data=0.2))

        # Why is this failing?
        on_input_data.mock.assert_called()
Lancetnik commented 1 year ago

I suppose we shouldn't support this behavior cuz it is a little implicit than direct producer call and wrapping the same function

Also we have a big troubles with this feature implementation cuz we doesn't patch an original function behavior at all for now.

davorrunje commented 1 year ago

If this is too difficult to implement, we can live with it. However, we should somehow notify the user that we do not support this usage of the producer decorator. I would either use logger.warning or raise a NotImplemented exception in such cases. How difficult is it to add a logger message or exception in such case?

Lancetnik commented 1 year ago

https://github.com/airtai/fastkafka/blob/FastStream/faststream/broker/wrapper.py#L90

No way for now, it is another library responsibility

davorrunje commented 1 year ago

@Lancetnik is this comment about calling mocks with Pydantic classes? The question above is about having a function decorated with producer only.

Lancetnik commented 1 year ago

@davorrunje Yes, I'm sorry about a wrong Issue answer: it's not very convenient to work from the phone

Regarding the warning when calling functions: I deliberately leave the functions wrapped by our decorators unchanged. This is a common use case when testing HTTP frameworks: test handlers as if they were regular functions. Therefore, I do not modify them with any side effects (like publishing or warnings).

publisher decorator works only on functions that are already wrapped by the subscriber - thus the user creates an obvious data pipeline unit.

It seems to me that the reason why you are trying to send a message by calling a wrapped function is just a habit from FastKafka. I assume that new users will not expect this behavior (because it is not entirely obvious), especially if they do not find such example in the documentation.

I think it will be enough to display only the correct examples in the documentation, and also indicate that the framed functions do not have side effects.

If you want to add a warning or modify the function with some additional behavior, you can do it here, but I don't think it's necessary

davorrunje commented 1 year ago

Thanx for the explanation. I'll close the issue now. Let's see if this is a problem in practice or not before implementing enything.

4ydan commented 7 months ago

So is it still not possible to only publish messages. I have a FastAPI app which publish messages to a stream. Is there any way to do this, without subscribing. I have tried to do it after the docs but failed... Ok it works, I forgot to add "stream=" in publish()

router = RedisRouter("redis://localhost:6379")
app = FastAPI(lifespan=router.lifespan_context)
v1 = FastAPI(version="0.0.1", openapi_url="/openapi.json")

@v1.get("/hello")
async def hello_http():
    await router.broker.publish("Test", stream="Test")
    return "Hello"
app.include_router(router)