This PR fixes the NOT_COORDINATOR warning when Kafka consumers try to commit offsets. It's fixed by routing the OffsetCommit requests to the group coordinator in the KafkaSinkCluster method route_requests().
We need to check if the offset has been committed after sending the offset commit request, and hence need the commit_sync to block until the commit succeeds.
It may be possible to wrap committed() in the OffsetCommitCallback for the commitAsync() method so that the check on committed offsets can be triggered after commitAsync() completes. But it seems complicated to implement and probably not worth the effort.
This PR fixes the
NOT_COORDINATOR
warning when Kafka consumers try to commit offsets. It's fixed by routing theOffsetCommit
requests to the group coordinator in the KafkaSinkCluster methodroute_requests()
.It also adds a new Kafka integration test case
produce_consume_commit_offsets_partitions1
which callscommit_sync(&offsets)
to commit the offset after a valid consume, and callscommitted
to verify the offset has been successfully committed (refs: Kafka Java driver commitSync(), Kafka Java driver committed(), Kafka C++ driver commit(), Kafka C++ driver committed_offsets()).commit_sync
was chosen overcommit_async
due to the following reasons:We need to check if the offset has been committed after sending the offset commit request, and hence need the
commit_sync
to block until the commit succeeds.It may be possible to wrap
committed()
in the OffsetCommitCallback for thecommitAsync()
method so that the check on committed offsets can be triggered aftercommitAsync()
completes. But it seems complicated to implement and probably not worth the effort.Closes #1687