airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.2k stars 106 forks source link

Propagate exceptions from FastKafka to Tester #121

Closed davorrunje closed 1 year ago

davorrunje commented 1 year ago

When running in serve mode, FastKafka should log all exceptions and just continue processing messages.

When running under Tester, FastKafka should propagate its exceptions to Tester.

app = FastKafka()

class Data(BaseModel):
  a: str

@app.consumes()
def on_data(data: Data):
  raise ValueError(42)

@app.produces()
def to_db(x: int) -> data: Data:
  return "whatever"

with pytest.raises(ValueError) as e:
   tester = Tester(app)
    with tester.
        tester.to_data(Data("fortytwo")

await tester.awaitable_mocks.on_data.assert_raised_exception

assert e.value ==42
davorrunje commented 1 year ago

I had a quick look through the code snippets and docs so maybe I missed it, what happens if the message received on the topic doesn't match the pydantic model? Is there an error handler or callback?

https://news.ycombinator.com/item?id=35086594

davorrunje commented 1 year ago

https://kai-waehner.medium.com/error-handling-via-dead-letter-queue-in-apache-kafka-903794d7cf78

davorrunje commented 1 year ago

Closed by #278