swift-server / swift-kafka-client

Apache License 2.0
80 stars 19 forks source link

Transactional API #78

Open mr-swifter opened 1 year ago

mr-swifter commented 1 year ago

Transactional API is required to use Exactly Once Semantics provided by Kafka.

One of the ideas how to structure this:

blindspotbounty commented 1 year ago

We had a couple of discussions with @mr-swifter regarding potential transactional API.

The easiest thing would be to implement calls "as is". However, there are some logic that might be used by more or less everyone such as:

  1. Retry committing transaction if error is retriable rd_kafka_error_is_retriable(error) == 1
  2. Abort transaction if transaction require abort and return corresponding error if rd_kafka_error_txn_requires_abort(error) == 1

Omitting insignificant implementation details and making it more abstract, we were using the following code in send/commit and partially in abort transaction.

func commitTransaction(attempts: UInt, timeoutMs: Int = -1 /* wait until transaction.timeout.ms */) async -> Result<Void, KafkaError> {
        for _ in 0..<attempts {
            let error = await forBlockingFunc {
                rd_kafka_commit_transaction(self.kafkaHandle, Int32(timeoutMs))
            }

             /* check if transaction is completed successfully  */
             if error == nil { return .success(()) }

             /* destroy error in any case */
             defer { rd_kafka_error_destroy(error) } 

             /* check if transaction is retriable and retry */
             if rd_kafka_error_is_retriable(error) == 1 { continue }

             /* check if transaction need to be aborted */
             if rd_kafka_error_txn_requires_abort(error) == 1 {
                 // at this point we cannot retry, client code should begin transaction from scratch
                 let res = await abortTransaction()
                 return /* TransactionAbortedError/InconsistentStateError from `res` */
             }

             let description = String(cString: rd_kafka_error_string(error))
             let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal error require producer restart
             return /* Error + isFatal + description */
        }
        return /* out of attempts error */
}