Closed jonathangenlambda closed 1 year ago
@AlexeyRaga sorry forgot to add you as reviewer.
I'm also working on a project that would benefit from this feature - is this likely to be merged soon?
@james-collapse sorry, somehow missed the notification
We found that hw-kafka-client lacks the transactional functionality offered by Kafka so we decided to add it, because we need it in our projects. The changes we made are (mostly) quite straighforward:
We added mappings for all transaction-related functions from librdkafka to Kafka.Internal.RdKafka. This involves: rd_kafka_init_transactions, rd_kafka_begin_transaction, rd_kafka_send_offsets_to_transaction, rd_kafka_commit_transaction, rd_kafka_abort_transaction, rd_kafka_error_is_fatal, rd_kafka_error_is_retriable, and rd_kafka_error_txn_requires_abort. We also implemented boolFromCInt to map a low-level CInt to a Bool, which we needed for rd_kafka_error_is_fatal, rd_kafka_error_is_retriable and rd_kafka_error_txn_requires_abort.
For each of the above mappings we implemented a corresponding high-level function, which translates the high-level Haskell types into the more low-level mappings. We mostly followed the existing code in implementing these functions. This was done in a new module called Kafka.Transaction, which was added to the cabal file.
For implementing commitOffsetMessageTransaction we implemented an additional function rdKafkaSendOffsetsToTransaction which queries the metadata behind the scenes, without the client ever knowing. This makes the API easier to use and also it was necessary because we could not expose the rd_kafka_consumer_group_metadata_t struct publicly because it was not defined in rdkafka.h. For this, we also needed to add rd_kafka_consumer_group_metadata and rd_kafka_consumer_group_metadata_destroy mappings.
We added a new type TxError which holds a KafkaError and flags whether the error is fatal, retriable or needs Tx abort. It therefore mimics rd_kafka_error_t. According to https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#transactional-producer in case of a retriable error, a TX can be retried, but the issue with the current implementation is that KafkaError does not hold any information on that, so we decided to introduce TxError which holds precisely this information. This error type is returned by 2 functions only: commitTransaction and commitOffsetMessageTransaction. This is because only in these cases we can retry/abort, see https://github.com/edenhill/librdkafka/blob/master/examples/transactions.c
We added extraProp to Producer, because it is quite convenient to have it, and we thought that the Producer also deserves such a function, given the Consumer has it already.
We implemented rewindCustomer in Kafka.Customer which follows https://github.com/edenhill/librdkafka/blob/master/examples/transactions.c#L162-L207. As probably all applications which use Kafka transaction use it, we think it is a very useful addition to the API, not just to our own application.
We added rd_kafka_seek_partitions and implemented a corresponding seekPartitions in Kafka.Consumer. This was both, because we needed it for implementing rewindCustomer and because according to the librdkafka, seek is deprecated, see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L3588-L3620
We solved 2 compilation warnings of the existing code.
We bumped the version of hw-kafka-client from 5.0.0 to 5.1.0 because stricly speaking there have been no incompatible API changes. Deprecating seek is not a breaking API change, as seek can still be used.
We tested the behaviour of the code through our projects and it works. We did not do an in-depth testing of all error scenarios but the happy path works. What we did is: init transactions (producer); consume a message (consumer); begin TX (producer); produce a message synchronously (producer); commit offsets TX (producer + consumer); commit TX (producer) -> Success.