confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
9 stars 328 forks source link

Prevent clearing topic-partitions that are still assigned during a rebalance #649

Open SatyaKuppam opened 1 year ago

SatyaKuppam commented 1 year ago

Problem

To decrease the impact of rebalances during rolling bounces of k8s pods, we changed the partition.assignment.strategy from the default RangeAssignor to CooperativeStickyAssignor. After this change we encountered NPEs and the S3SinkTask goes into an unrecoverable state. We did not find the same issue with StickyAssignor however.

Example of an NPE (this is with v10.0.7):

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask\
      \ due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\t\
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\
      \tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\
      \tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\
      \tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.NullPointerException\n\
      \tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\
      \t... 10 more\n"

Possible Fix in #648

SatyaKuppam commented 1 year ago

@pbadani looping you on this, since you are the last committed user.

BDeus commented 1 year ago

Same for me, had to revert to previous Assignor. Furthermore CooperativeStickyAssignor will became the defaut assignor one day (KIP about it https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248)

SatyaKuppam commented 1 year ago

Same for me, had to revert to previous Assignor. Furthermore CooperativeStickyAssignor will became the defaut assignor one day (KIP about it https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248)

Hey @BDeus, thanks for confirming. We are using the patch provided in this PR and it working fine with CooperativeStickyAssignor, we haven't seen issues as yet.

enzo-cappa commented 1 month ago

This is also affecting us, and it would be great to have this fixed.