swift-server / swift-kafka-client

Apache License 2.0
83 stars 24 forks source link

add approx. transactional api #98

Open blindspotbounty opened 1 year ago

blindspotbounty commented 1 year ago

There is an approximate solution for Transactional API for Kafka gsoc.

Some ideas that were expressed in https://github.com/swift-server/swift-kafka-gsoc/issues/78#issuecomment-1642042408

There are the following ideas:

  1. We might initTransaction and switch state in machine
  2. Run blocking transactional calls in GlobalQueue
  3. Retry retryable errors
  4. Automatically abort transactions when abort error received

UPD: after review API changed to the following:

To use kafka transactions, it is required to create KafkaTransactionalProducerConfiguration:

let config = KafkaTransactionalProducerConfiguration(transactionalID: "1234")

Similar to KafkaProducer, it is possible to create KafkaTransactionalProducer:

let transactionalProducer = try await KafkaTransactionalProducer.makeTransactionalProducerWithEvents(config: config)

To commit transactions, it is simply required to call withTransaction(...):

try await transactionalProducer.withTransaction { transaction in
    // Produce new messages:
    let newMessage = KafkaProducerMessage(
        topic: "<some topic>",
        value: "<some value>"
    )
    try transaction.send(newMessage)
...
    // commit offsets:
    let partitionlist = RDKafkaTopicPartitionList()
    partitionlist.setOffset(topic: self.uniqueTestTopic, partition: message.partition, offset: Int64(message.offset))
    try await transaction.send(offsets: partitionlist, forConsumer: consumer)
...
}