fd4s / fs2-kafka

Functional Kafka Streams for Scala
https://fd4s.github.io/fs2-kafka
Apache License 2.0
295 stars 100 forks source link

Support for transactions #128

Closed danxmoran closed 5 years ago

danxmoran commented 5 years ago

Kafka has special APIs to support transactional read-process-write message flows. The gist seems to be:

  1. Producers get an extra piece of config transactional.id, and call initTransactions() on startup
  2. Committing offsets runs through the producer, which surrounds each batch with begin-/commit-transaction calls

What would be the best way to add this functionality to fs2-kafka? I was looking through Alpakka for reference and it looks like they separate "normal" and transactional types for both producers and consumed messages. I like the idea of using types to force the separation of use-cases, but I'm not sure how much of a maintenance burden it'd create for this project.

My team has a need for this functionality, so I'd be happy to help implement the new features.

vlovgr commented 5 years ago

Having looked at it briefly, I think we could introduce a TransactionalKafkaProducer (or similar), which basically calls initTransactions() upon creation, and whose produce (or similar name) does beginTransaction + send (all records in the ProducerMessage) + sendOffsetsToTransaction + commitTransaction. The library user then just needs to create a ProducerMessage with however many records should go in the same transaction.

For the consumer, it seems we technically don't need to do anything except set isolation.level to read_committed. We could introduce a different type for it, but not sure if it's worth it, unless there is something more we have to do.

I think it would be a great addition, so happy to give a hand if you want to give it a go.

vlovgr commented 5 years ago

Fixed by #130.

@danxmoran If you want to create a separate issue for the automatic management of transactional.id, then feel free.