confluentinc / parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
https://confluent.io/confluent-accelerators/#parallel-consumer
Apache License 2.0
689 stars 117 forks source link

Sporadic timeouts from ConsumerOffsetCommitter.CommitRequest #809

Open tedcaozoom opened 3 days ago

tedcaozoom commented 3 days ago

Our applications in GKE began to have sporadic timeouts with following exceptions started 6/7/2024, and we ruled out code problems and infrastructural problems and we also tested the newest version of parallel consumer without success, we do not know what's going on

java.util.concurrent.ExecutionException: io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=5d9ed827-0261-4a06-907c-4010bfe919c8, requestedAtMs=1718620540268) at java.base/java.util.concurrent.FutureTask.report(Unknown Source) at java.base/java.util.concurrent.FutureTask.get(Unknown Source) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:559) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:539) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:518) at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:61) at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerContainer.close(ParallelConsumerContainer.java:47) at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerGroup.lambda$stop$1(ParallelConsumerGroup.java:99) at java.base/java.lang.Iterable.forEach(Unknown Source) at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerGroup.stop(ParallelConsumerGroup.java:97) at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:248) at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:54) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:374) at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:207) at org.springframework.context.support.DefaultLifecycleProcessor.onClose(DefaultLifecycleProcessor.java:130) at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1070) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:174) at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:1024) at org.springframework.boot.SpringApplicationShutdownHook.closeAndWait(SpringApplicationShutdownHook.java:145) at java.base/java.lang.Iterable.forEach(Unknown Source) at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) at java.base/java.lang.Thread.run(Unknown Source) Caused by: io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=5d9ed827-0261-4a06-907c-4010bfe919c8, requestedAtMs=1718620540268) at io.confluent.parallelconsumer.internal.InternalRuntimeException.msg(InternalRuntimeException.java:23) at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commitAndWait(ConsumerOffsetCommitter.java:154) at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commit(ConsumerOffsetCommitter.java:74) at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:346) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1231) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:622) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$10(AbstractParallelEoSStreamProcessor.java:751) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 common frames omitted

tedcaozoom commented 3 days ago

also the timeouts are not limited to OffsetCommitter, here are ones from sending messages

at io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun(UserFunctions.java:61)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$poll$0(ParallelEoSStreamProcessor.java:54)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$runUserFunctionInternal$17(AbstractParallelEoSStreamProcessor.java:1323)
at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:65)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunctionInternal(AbstractParallelEoSStreamProcessor.java:1323)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunction(AbstractParallelEoSStreamProcessor.java:1274)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$submitWorkToPoolInner$14(AbstractParallelEoSStreamProcessor.java:928)

java.lang.InterruptedException: null at java.base/java.util.concurrent.FutureTask.awaitDone(Unknown Source) at java.base/java.util.concurrent.FutureTask.get(Unknown Source) at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:120) at com.zoominfo.kafka.producer.ProtobufKafkaMessageSender.send(ProtobufKafkaMessageSender.java:43) at com.zoominfo.personincremental.trace.KafkaTraceRegistry.register(KafkaTraceRegistry.java:50) at com.zoominfo.traces.CompositeTraceRegistry.register(CompositeTraceRegistry.java:18) at com.zoominfo.component.messaging.person.PersonBaseMessageHandler.handleMessage(PersonBaseMessageHandler.java:73) at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerContainer.lambda$start$0(ParallelConsumerContainer.java:29) at io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun(UserFunctions.java:61) at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$poll$0(ParallelEoSStreamProcessor.java:54) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$runUserFunctionInternal$17(AbstractParallelEoSStreamProcessor.java:1323) at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:65) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunctionInternal(AbstractParallelEoSStreamProcessor.java:1323) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunction(AbstractParallelEoSStreamProcessor.java:1274) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$submitWorkToPoolInner$14(AbstractParallelEoSStreamProcessor.java:928) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source)

rkolesnev commented 3 days ago

Hi @tedcaozoom,

Can you please provide more information on the architecture, configuration and environment?

Are there specific steps that enable to reproduce the issue in test / synthetic environment? Alternatively - can you provide sample code that reproduces the issue observed?

Configuration

Consumer configuration

Setup

Details about the issue observed

Please provide logs spanning period when the issue in question is observed

Please provide any metrics collected for both Kafka cluster / topic and Parallel Consumer application (including KafkaConsumer and KafkaProducer) metrics for the period when the issue in question is observed

From the stack traces above:

tedcaozoom commented 1 day ago

@rkolesnev I will try to get more info as soon as I can but I have a more urgent need, I really need our pods in GKE to be able to restart themselves when they encounter this problem (which basically kill our pods and we have to manually restart them, as soon as restarted they work fine for a while), so I would like to know how better to build the health checks

currently we do the following and it's NOT working, so apparently ParallelConsumerGroup's containers are NOT closed or failed, how else can we do this?? ParallelConsumerGroup.isRunning() ??? or something else to indicate a problem? we need GKE to restart the pods themselves to buy us time to diagnose this further

    if (ParallelConsumerGroup.getContainers().stream().anyMatch(c -> c.getProcessor().isClosedOrFailed())) {
        return Health.down().build();
    }
tedcaozoom commented 19 hours ago

just to report, it appears that ParallelConsumerGroup.isRunning() works correctly

so basically !ParallelConsumerGroup.isRunning() == needing to restart

so one more piece of information for this is that, when we encounter these timeouts, ParallelConsumerGroup stops running, ParallelConsumerGroup.isRunning() becomes FALSE