airtai / fastkafka

FastKafka is a powerful and easy-to-use Python library for building asynchronous web services that interact with Kafka topics. Built on top of Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics.
https://fastkafka.airt.ai
Apache License 2.0
41 stars 2 forks source link

If the object (message) is changed in consumes/produces functions, tester doesn't work properly. #60

Open rjambrecic opened 12 months ago

rjambrecic commented 12 months ago

e.g.

class Currency(BaseModel):
    currency:` str = Field(..., description="Currency")

@app.consumes(prefix="on", topic="store_product")
async def on_store_product(msg: Currency):
    msg.currency = "EUR"

async with Tester(app).using_inmemory_broker() as tester:
    await tester.to_store_product(Currency(currency="HRK"))
    await app.awaited_mocks.on_store_product.assert_called_with(
        Currency(currency="HRK"),
        timeout=5
    )

assertion would fail: AssertionError: expected call not found. Expected: mock(Currency(currency='HRK')) Actual: mock(Currency(currency='EUR'))

Quick fix: add at the beginning of each consumes/produces: msg_copy = msg.model_copy() and use the copy in the rest of the function...

Good fix: add msg.model_copy() to the Tester implementation

davorrunje commented 11 months ago

Why do you think creating a copy is a good idea? It is clearly overhead and not needed in most use cases. If needed, it is easy to create a copy in the body of the function.