confluentinc / parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
https://confluent.io/confluent-accelerators/#parallel-consumer
Apache License 2.0
689 stars 118 forks source link

NullPointerException on partitions revoked #757

Closed Amoratinos closed 1 month ago

Amoratinos commented 2 months ago

I have an application deployed on a k8s cluster, while the system is under some load and the pod is scaled up I've seen a NullPointerException in the logs when the partitions are being revoked from an existing consumer

java.lang.NullPointerException: Cannot invoke "io.confluent.parallelconsumer.state.WorkContainer.getRetryDueAt()" because "workContainer" is null
    at i.c.parallelconsumer.state.ShardManager.lambda$new$0(ShardManager.java:81)
    at java.util.Comparator.lambda$comparing$77a9974f$1(Comparator.java:473)
    at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:220)
    at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:220)
    at java.util.TreeMap.getEntryUsingComparator(TreeMap.java:374)
    at java.util.TreeMap.getEntry(TreeMap.java:344)
    at java.util.TreeMap.remove(TreeMap.java:881)
    at java.util.TreeSet.remove(TreeSet.java:276)
    at i.c.parallelconsumer.state.ShardManager.removeWorkFromShardFor(ShardManager.java:173)
    at i.c.parallelconsumer.state.ShardManager.removeAnyShardEntriesReferencedFrom(ShardManager.java:157)
    at i.c.p.state.PartitionState.onPartitionsRemoved(PartitionState.java:545)
    at i.c.p.state.PartitionStateManager.resetOffsetMapAndRemoveWork(PartitionStateManager.java:245)
    at i.c.p.state.PartitionStateManager.onPartitionsRemoved(PartitionStateManager.java:184)
    at i.c.p.state.PartitionStateManager.onPartitionsRevoked(PartitionStateManager.java:175)
    at i.c.parallelconsumer.state.WorkManager.onPartitionsRevoked(WorkManager.java:109)
    at i.c.p.i.AbstractParallelEoSStreamProcessor.onPartitionsRevoked(AbstractParallelEoSStreamProcessor.java:421)
    ... 16 common frames omitted
Wrapped by: i.c.p.internal.InternalRuntimeException: onPartitionsRevoked event error
    at i.c.p.i.AbstractParallelEoSStreamProcessor.onPartitionsRevoked(AbstractParallelEoSStreamProcessor.java:423)
    at o.a.k.c.c.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:340)
    at o.a.k.c.c.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:830)
    ... 14 common frames omitted
Wrapped by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
    at o.a.k.c.c.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:861)
    at o.a.k.c.c.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:443)
    at o.a.k.c.c.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)
    at o.a.k.c.c.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)
    at o.a.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)
    at o.a.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
    at o.a.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)
    at i.c.p.internal.ConsumerManager.poll(ConsumerManager.java:58)
    at i.c.p.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:205)
    at i.c.p.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:160)
    at i.c.p.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:136)
    at java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at j.u.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.lang.Thread.run(Thread.java:842)

From what I've checked in the code looks like a null WorkContainer is passed to the retryQueue TreeSet thus the exception looks like a valid one ShardManager

I'm on version 0.5.2.8.

sangreal commented 2 months ago

This should be race condition after checking the code, I am working on the fix

rkolesnev commented 1 month ago

Fix for this is merged now. @Amoratinos - can you please test on build from master?

Amoratinos commented 1 month ago

I can give it a try but I just saw this error once and under heavy load 🤞

rkolesnev commented 1 month ago

Closing as fixed - reopen if it will reoccur.