confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
12 stars 396 forks source link

Incremental Co-operative Rebalancing Support for HDFS Connector #711

Open ychernysh opened 3 days ago

ychernysh commented 3 days ago

Problem

See https://github.com/confluentinc/kafka-connect-hdfs/issues/625. If consumer.partition.assignment.strategy is set to org.apache.kafka.clients.consumer.CooperativeStickyAssignor in config/connect-distributed.properties, after a partial partition revocation (say, a new worker joins and takes over some partitions from some other worker) in a task it will be killed due to such NullPointerException:

[2024-11-15 13:46:37,190] ERROR [hdfs-sink-2|task-1] WorkerSinkTask{id=hdfs-sink-2-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:364)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:133)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
        ... 11 more

The reason for that is that during onPartitionsRevoked callback the DataWriter currently closes and removes all of its TopicPartitionWriters, probably assuming eager rebalancing. If the consumer only gets some partitions revoked and none new assigned, the onPartitionsAssigned will be called with empty collection and the topicPartitionWriters collection will remain empty. When new data arrives (from partitions that the consumer kept ownership of) to HdfsSinkTask#put, the NullPointerException will be thrown when accessing the topicPartitionsWriters.

Solution

Only close and remove from topicPartitionsWriters those partitions retrieved from HdfsSinkTask#close(Collection<TopicPartition>). Note: there is a 9 years old comment explaining why should we close all of the topic partition writers, which I didn't really understand. This solution simply ignores and removes it. @ewencp, can you please review and put some comments if it would be safe to do so?

Does this solution apply anywhere else?
If yes, where?

Test Strategy

Added HdfsSinkTaskTest#testPartialRevocation unit test simulating a partial revocation that throws NullPointerException without the fix. Plus, tested manually (after adding a second worker (partial revocation happens) and writing some data to topic no NPE is thrown).

Testing done:

Release Plan

Please see a similar issue for Connect framework itself, KAFKA-12487. I was testing the fix on Kafka 7.7.2-18-ccs where this fix is already present, but haven't tested on earlier versions without it. Should I do it?

confluent-cla-assistant[bot] commented 3 days ago

Please sign the Contributor License Agreement here before this PR can be approved.
:x: ychernysh
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.