airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.13k stars 160 forks source link

Support checking keys in tests #557

Open davorrunje opened 1 year ago

davorrunje commented 1 year ago

Please see the example and let me know what you think:

import pytest

from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, TestKafkaBroker

class Data(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

to_output_data = broker.publisher("output_data")

@broker.subscriber("input_data")
# we should be able to get the key (and other stuff like partion, offset, etc injected)
async def on_input_data(msg: Data, logger: Logger, key: bytes) -> None:
    logger.info(f"on_input_data({msg=})")
    await to_output_data.publish(Data(data=msg.data + 1.0), key=b"key")

@broker.subscriber("output_data")
async def on_output_data(msg: Data, logger: Logger, key: bytes) -> None:
    logger.info(f"on_output_data({msg=})")

@pytest.mark.skip("we are not checking the key")
@pytest.mark.asyncio
async def test_base_app():
    async with TestKafkaBroker(broker):
        # we should be able to publish a message with the key
        await broker.publish(Data(data=0.2), "input_data", key=b"my_key")

        # we need to check the key as well
        on_input_data.mock.assert_called_once_with(dict(Data(data=0.2)), key=b"my_key")
        to_output_data.mock.assert_called_once_with(dict(Data(data=1.2)), key=b"key")
        on_output_data.mock.assert_called_once_with(dict(Data(data=1.2)), key=b"key")
Lancetnik commented 1 year ago

For now user is able to access any message field via the Context

In the tests case, I am not sure that is required Will answer with examples in a few hours

davorrunje commented 1 year ago

it would be very useful to be able to check Context in the tests, how difficult would that be to implement?

Lancetnik commented 1 year ago

We trigger mock here: https://github.com/airtai/fastkafka/blob/FastStream/faststream/broker/wrapper.py#L90 It is not a big problem to add something extra to it, but we need to create an API this extra information

davorrunje commented 1 year ago

This is not a priority right now, but I'll leave the issue for later.

Lancetnik commented 11 months ago

I have an idea - we can just add assert_called_* methods to the consumer itself

This way we can override any information before pass it to the inner mock object and add some extra checks

consumer.assert_called_once_with({"user": "John", "age": 25})
# instead
consumer.mock.assert_called_once_with({"user": "John", "age": 25})

This way we can make the following checks are equal:

consumer.assert_called_once_with({"user": "John", "age": 25})
consumer.assert_called_once_with(user="John", age=25)
consumer.assert_called_once_with(User(user="John", age=25))

And add any context check feature

consumer.assert_called_once_with(
    body={"user": "John", "age": 25},
    context={
        "message.key": b"key"
    }
)
davorrunje commented 11 months ago

not bad, I like it

Lancetnik commented 11 months ago

Alternatively (and much easier) we can add assert_called_with_context method to check just a context

consumer.mock.assert_called_once_with({"user": "John", "age": 25})
consumer.assert_called_with_context({"message.key": b"key"})