akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 386 forks source link

Support producer only Kafka transactions #1075

Open seglo opened 4 years ago

seglo commented 4 years ago

Short description

Support producer only Kafka transactions in Transactional.sink|flow.

Details

Alpakka Kafka currently only supports a consume, transform, produce workflow for Kafka Transactions. In some cases it may be useful to only enable transactions for messages produced to Kafka, for example, when the source data is not currently in Kafka. This would have less guarantees than the standard consume, transform, produce workflow because the user would need to take care not to send duplicate messages between restarts of the workload, but it would be useful to ensure that all messages within a transaction are committed or rolled back. This could be useful when using MultiMessage to ensure that all or none of the messages are produced.

One drawback could be that users might want more control over exactly when transactions are committed. Currently we only support transactions of fixed time intervals, similar to Kafka Streams. I don't recommend we give the user more control over when commits occur, but we can make sure that use cases like having all messages from a MultiMessage in a single transaction are supported.

References

mdedetrich commented 2 years ago

This is also something that we are missing, currently writing a backup tool with Kafka and for the restore portion (where we restore from a backup to a kafka cluster) and ideally we would like to use the Alpakka Transactional API to stream messages out of the backup source to guarantee a exactly once restore.

As a workaround currently using

def baseProducerConfig
    : Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
  Some(
    _.withBootstrapServers(
      container.bootstrapServers
    ).withProperties(
      Map(
        ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG             -> true.toString,
        ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
        ProducerConfig.BATCH_SIZE_CONFIG                     -> 0.toString
      )
    ).withParallelism(1)
  )

but this is less than ideal

wim82 commented 1 year ago

We have the same use case.. but also fairly sure im not capable of adding support for this :)