silviucpp / erlkaf

Erlang kafka driver based on librdkafka
MIT License
83 stars 41 forks source link

How to use EOD (transactional.id) #26

Closed matreyes closed 1 month ago

matreyes commented 3 years ago

Hi, I've recently started testing erlkaf after a year using brod, and I'm very impressed of how it works perfectly well. Congratulations! Have anyone tried to implement a consumer -> producer (stream processing) job with transactional_id to ensure Exactly once delivery? How would it be?, Would we need a new type of consumer / producer group?

silviucpp commented 3 years ago

Have you tried to set the transactional_id into the producer ? I didn't studied exactly if this is the only thing needed or might need some other changes as well. I'm not currently using this feature.

matreyes commented 3 years ago

Hi, Thanks for answering!

The idea is to have one producer (with unique transactional_id) for each consumer. So I was thinking how to spawn the producer from the consumer itself, so they are exited together.

Also look at the discussion here https://stackoverflow.com/questions/50335227/how-to-pick-a-kafka-transaction-id

The question is what happens with producer when consumer are rebalanced. One idea is to have a pool of producers with transactional_id composed as: <prefix>.<group.id>.<topic>.<partition> (one for each partition), and they are used by the consumer based on their assigned partition. I'm not sure of the advantage of this approach instead of having the producer attached to the consumer and using a transactional_id like <prefix>.<group.id>.<topic>.<consumer_pid> (maybe they chose the first option because they don't have our beautiful process model).

matreyes commented 3 years ago

Also, to use the transactional API, there are 2 functions that should be implemented: rd_kafka_init_transactions() and rd_kafka_commit_transaction(). Is there any interest on implementing them?