micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
86 stars 107 forks source link

NPE in ConsumerStateBatch.getCurrentRetryCount #1004

Open mpkorstanje opened 7 months ago

mpkorstanje commented 7 months ago

Expected Behavior

Given a Kafka listener with retry enabled, when an exception is thrown from the listener, it should be retried.

@KafkaListener(
    groupId = "example_failure-test",
    batch = true,
    errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = MAX_VALUE, retryDelay = "5s"),
    offsetReset = EARLIEST,
    offsetStrategy = OffsetStrategy.DISABLED
)
class BulkItemConsumer {

  @Topic("t.failure-test.test")
  void consume(List<ConsumerRecord<String, BulkItemOnIndex>> records, Consumer<String, BulkItemOnIndex> kafkaConsumer) throws IOException, ExecutionException {
     // An exception is thrown here to trigger the retry
  }

}

Actual Behaviour

2024-04-02 12:10:45.101Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Successfully joined group with generation Generation{generationId=1, memberId='failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc', protocol='range'}
2024-04-02 12:10:45.101Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Finished assignment for group at generation 1: {failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc=Assignment(partitions=[t.failure-test.test-0])}
2024-04-02 12:10:45.104Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Successfully synced group in generation Generation{generationId=1, memberId='failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc', protocol='range'}
2024-04-02 12:10:45.105Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Notifying assignor about the new Assignment(partitions=[t.failure-test.test-0])
2024-04-02 12:10:45.105Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Adding newly assigned partitions: t.failure-test.test-0
2024-04-02 12:10:45.106Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Found no committed offset for partition t.failure-test.test-0
2024-04-02 12:10:45.107Z | INFO  | pool-7-thread-1      | o.a.k.c.consumer.internals.SubscriptionState  | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Resetting offset for partition t.failure-test.test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}}.

// Some log lines indicating the consumer was called and threw an exception

2024-04-02 12:10:45.123Z | ERROR | pool-7-thread-1      | i.m.c.k.e.KafkaListenerExceptionHandler       | Kafka consumer [com.example.ExampleConsumer@3d39e8d0] produced error: Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because the return value of "java.util.Map.get(Object)" is null
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because the return value of "java.util.Map.get(Object)" is null
    at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.lambda$getCurrentRetryCount$3(ConsumerStateBatch.java:173)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.IntPipeline.reduce(IntPipeline.java:520)
    at java.base/java.util.stream.IntPipeline.max(IntPipeline.java:483)
    at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.getCurrentRetryCount(ConsumerStateBatch.java:175)
    at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.resolveWithErrorStrategy(ConsumerStateBatch.java:149)
    at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.processRecords(ConsumerStateBatch.java:93)
    at io.micronaut.configuration.kafka.processor.ConsumerState.pollAndProcessRecords(ConsumerState.java:212)
    at io.micronaut.configuration.kafka.processor.ConsumerState.refreshAssignmentsPollAndProcessRecords(ConsumerState.java:164)
    at io.micronaut.configuration.kafka.processor.ConsumerState.threadPollLoop(ConsumerState.java:154)
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
    at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:193)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
2024-04-02 12:10:45.124Z | INFO  | pool-7-thread-1      | i.m.c.kafka.processor.KafkaConsumerProcessor  | Consumer [failure-test-example-consumer] assignments changed: [] -> [t.failure-test.test-0]

// And at this point the application shuts down

2024-04-02 12:11:15.217Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Revoke previously assigned partitions t.failure-test.test-0
2024-04-02 12:11:15.218Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Member failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc sending LeaveGroup request to coordinator localhost:9092 (id: 2147482646 rack: null) due to the consumer is being closed
2024-04-02 12:11:15.218Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Resetting generation and member id due to: consumer pro-actively leaving the group
2024-04-02 12:11:15.218Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Request joining group due to: consumer pro-actively leaving the group

Steps To Reproduce

I can only reproduce this problem on a rather slow CI machine. I've tried to reproduce this locally, with no success. If anything I wouldn't expect an NPE and suspect that this may be due to a race condition. At a glance it looks like the partitions and currentOffsets are collected from Kafka at different times and may be subject to change.

Environment Information

Ci uses the following docker images:

Example Application

No response

Version

jeremyg484 commented 7 months ago

It is tough to provide a known exact fix for this without a definite reproducer (and I have not successfully been able to reproduce it in our test suite so far either), but I do think we should at least make some changes in ConsumerStateBatch to code a little more defensively. currentOffsets in particular is marked as @Nullable in most of the methods where it is passed around, and yet we don't seem to be doing anything to verify that is it not null when we should be.

My best guess is that it is a timing problem in the "slow" CI machine and the consumer hasn't yet been assigned a partition by Kafka (or it could be in the midst of being reassigned), and we need to account for that state.

mpkorstanje commented 7 months ago

My best guess is that it is a timing problem in the "slow" CI machine and the consumer hasn't yet been assigned a partition by Kafka (or it could be in the midst of being reassigned), and we need to account for that state.

I would concur with that guess. The current workaround is to run a passing test case before testing the failure, giving Kafka the time to assign partitions and finish balancing.

Defensive programming would make sense. I do think the @NonNull annotation are a red herring though.

https://github.com/micronaut-projects/micronaut-kafka/blob/01f2e378c4f1761eeb0c1e6651325faab893816b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerStateBatch.java#L170-L176

For this specific exception, it is not currentOffsets that is null, but rather currentOffsets.get(tp) returns null.