This PR adds a basic integration test for kafka transactions.
In order to get this integration test to pass support routing logic for the following message types is added to KafkaSinkCluster:
InitProducerId
if a transaction id is provided: route to the transaction coordinator
if no transaction id is provided: route to a random node
EndTxn
route to the transaction coordinator
AddPartitionsToTxn
if version <= 3: route to the transaction coordinator
if version > 3: split request, route each split request to the transaction coordinator, and then rejoin the responses.
To support this new routing logic I also had to add logic for fetching the transaction coordinator.
The logic is very similar to finding the group coordinator since both are fetched through the FindCoordinator request.
So I adjusted the find group coordinator logic to also support finding transaction coordinators.
Future work
Kafka transaction message types not tested or implemented by this PR:
TxnOffsetCommit
AddOffsetsToTxn
WriteTxnMarkers
In a follow up PR I will introduce more integration tests in an attempt to trigger a driver to send these requests. If I cannot find a way to send these requests I will assume that they are used for broker-broker communication and not used by clients.
some info on transactions at the protocol level: https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
This PR adds a basic integration test for kafka transactions. In order to get this integration test to pass support routing logic for the following message types is added to KafkaSinkCluster:
To support this new routing logic I also had to add logic for fetching the transaction coordinator. The logic is very similar to finding the group coordinator since both are fetched through the FindCoordinator request. So I adjusted the find group coordinator logic to also support finding transaction coordinators.
Future work
Kafka transaction message types not tested or implemented by this PR:
In a follow up PR I will introduce more integration tests in an attempt to trigger a driver to send these requests. If I cannot find a way to send these requests I will assume that they are used for broker-broker communication and not used by clients.