airtai / fastkafka

FastKafka is a powerful and easy-to-use Python library for building asynchronous web services that interact with Kafka topics. Built on top of Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics.
https://fastkafka.airt.ai
Apache License 2.0
42 stars 2 forks source link

Implement @process decorator that combines @consumes and @produces #7

Open sternakt opened 1 year ago

sternakt commented 1 year ago
davorrunje commented 1 year ago

Also, a question about the library design after reading the readme:

You currently have an example where you consume a message in a function decorated with a consumer decorator. Which then calls a produce decorated function to publish the result on a different queue.

It might make sense to have a dedicated decorator for functions that both consume and publish where the consumed type is your function argument and the produced type the return type all in one function. Currently it is not clear to me what would happen if you for instance consume a message, process it and publish it, and then the consumer function runs into an exception or something which causes it to crash.

I'm assuming the consumed message won't be acked at that point but the computed result is already published on the other queue at that point. Correct?

Anyways, food for thought I guess and these are the real struggles with pubsub systems where you don't want to generate duplicate messages etc.

https://www.reddit.com/r/Python/comments/11paz9u/comment/jbxfu6m/?utm_source=share&utm_medium=web2x&context=3

sternakt commented 1 year ago

Usefull AIOKafka docs for the transactional process: https://aiokafka.readthedocs.io/en/stable/examples/transaction_example.html#transaction-example

chr1st1ank commented 1 year ago

Please note that aiokafka currently doesn't seem to have implemented transactions correctly. This is explained in this github issue https://github.com/aio-libs/aiokafka/issues/844. I'm using aiokafka in practice with transactions anyway and it works. But our use case allows to have occasional duplicates as long as it doesn't happen often. If that can't be afforded, I would be careful.