airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.2k stars 106 forks source link

Producer not working if there is a subscriber with batch set to True on the same topic #497

Closed davorrunje closed 10 months ago

davorrunje commented 10 months ago

Why is this failing?

from typing import List

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, TestKafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("test")
async def handle(msg: str, logger: Logger):
    logger.info(msg)

@broker.subscriber("test_batch", batch=True)
async def handle_batch(msg: List[str], logger: Logger):
    logger.info(msg)

import pytest

@pytest.mark.asyncio
async def test_me():
    async with TestKafkaBroker(broker) as test_broker:
        # This works
        await test_broker.publish("123", "test")

        # why is this failing?
        await test_broker.publish("I", "test_batch")
Lancetnik commented 10 months ago

Fixed in the https://github.com/airtai/fastkafka/pull/498

rjambrecic commented 10 months ago

@Lancetnik How can I test batch consuming for the following example ?

from typing import List

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, TestKafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("test_batch", batch=True)
async def handle_batch(msg: List[str], logger: Logger):
    logger.info(msg)

import pytest

@pytest.mark.asyncio
async def test_me():
    async with TestKafkaBroker(broker) as test_broker:

        # await test_broker.publish_batch(["I", "am", "FastStream"], topic="test_batch")
        await test_broker.publish_batch("I", topic="test_batch")
        await test_broker.publish_batch("am", topic="test_batch")
        await test_broker.publish_batch("FastStream", topic="test_batch")

        handle_batch.mock.assert_called_with(["I", "am", "FastStream"])
Lancetnik commented 10 months ago

await test_broker.publish_batch("I", "am", "FastStream", topic="test_batch")

rjambrecic commented 10 months ago

What about tests for Pydantic messages @Lancetnik

from typing import List
from pydantic import BaseModel, Field

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, TestKafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

class HelloWorld(BaseModel):
    msg: str = Field(
        ...,
        examples=["Hello"],
        description="Demo hello world message",
    )

@broker.subscriber("test_batch", batch=True)
async def handle_batch(msg: List[HelloWorld], logger: Logger):
    logger.info(msg)

import pytest

@pytest.mark.asyncio
async def test_me():
    async with TestKafkaBroker(broker) as test_broker:
        await test_broker.publish_batch(HelloWorld(msg="First Hello"), HelloWorld(msg="Second Hello"), topic="test_batch")

        # we don't want dictionary
        print(handle_batch.mock.call_args) # prints: call([{'msg': 'First Hello'}, {'msg': 'Second Hello'}])
        # we want Pydantic model element
        handle_batch.mock.assert_called_with([HelloWorld(msg="First Hello"), HelloWorld(msg="Second Hello")])
Lancetnik commented 10 months ago

In the mock we are checking not serialized message body, so we can't check pydantic models here

davorrunje commented 10 months ago

@Lancetnik How difficult would it be to add support to Pydantic models? E.g. we could use a mock wrapper and deserialize Pydantic class in calls.

Lancetnik commented 10 months ago

@davorrunje I implemented this approach specifically: it allows the user to abstract from the details of the project implementation and test exactly the scheme of the received data (because in a mock object he has access to the message in the form in which it is transmitted through the broker)

Also, right now we can't access the serialized message body, because this part is implemented inside FastDepends.

https://github.com/airtai/fastkafka/blob/FastStream/faststream/broker/wrapper.py#L90

But I can implement this functionality if it is necessary.