Open mbrettschneider opened 5 years ago
I met the same case.
After some dig, I can see rebalancing might cause the the topic partitions param of close()
[0] contains more items than the opened producer map [1] in the sink task.
I think the root cause is:
TL;DR, at [2] the currentOffsets
is used to call task.close()
however it can be updated at [3] where is before to open the partition and prepare the producer for the sink task at [4], this finally causes producerMap
in the sink task out-of-sync with the currentOffsets
in the sink task work.
In the detail, during the consumer polls the message from the broker at [5], the rebalanceException
can happen at [6] and can happen more than one time, however doesn't like [3] openPartitions()
will be called only once at first time.
I can see an issue report about kafka-connect-storage-cloud
cased by a similar out-of-sync case at [7].
The solution:
Instead of just to handle all the the local producer/partition items but ignore the param partitions
(topicPartitionWriters
at [8]), I prefer to do an easy check like [9], will verify it.
[0] https://github.com/awslabs/kinesis-kafka-connector/blob/master/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java#L280 [1] https://github.com/awslabs/kinesis-kafka-connector/blob/master/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java#L275 [2] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L397 [3] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L630 [4] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L652 [5] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L447 [6] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L650 [7] https://github.com/confluentinc/kafka-connect-storage-cloud/pull/322/files [8] https://github.com/confluentinc/kafka-connect-storage-cloud/pull/322/files#diff-16b6be2d931b0825d79f3b4c517327b4R225 [9] https://github.com/zhiyanliu/kinesis-kafka-connector/blob/master/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java#L284
@zhiyanliu what would be the fastest way to reproduce the issue? Just make it rebalancing all the time until async happens? Topic repartitioning?
@bdesert I think so at the moment, trigger rebalancing continually until it happens. According to the 3 points listed in the doc [0], beside topic repartitioning you can also trigger it by:
For me, we met this in a dedicated environment I can't touch, I didn't try to reproduce it locally and analyzed it by static code logic.
[0] https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership/
@akhon, can you confirm from your side the fix is working as expected?
Not relevant anymore.
I tried to start the connector with
Unfortunately after some promising messages:
I get the following messages:
What can I do to make it work?