Open dumontxiong opened 2 weeks ago
Hi @dumontxiong - it is hard to tell if it is an issue or not - it is possible not to get a commit response if something went wrong with Kafka cluster, but without more information I cannot really tell if that was the cause here. In general with CONSUMER_PERIODIC_SYNC commit mode - there is 2 possible execution modes - depending which thread has requested commit: 1) if commit is requested on the thread owning the Committer (it is the Broker Poll thread) - then commit happens immediately with blocking commit request. 2) if commit is requested by a different thread - then it is added to commitRequest queue and thread is waiting for matching response on the commitResponseQueue - the commit processing itself still happens on the broker-poll thread as part of the control loop.
I cannot tell what happened in this case - why response is not there etc - but it should be visible in Debug logs when commit request is picked off the request queue (and if its not picked / processed - then maybe will be something showing why it wasnt). Can you provide more complete logs? I can see that you have debug logging enabled at least for ConsumerOffsetCommitter so that may help to clarify things.
Or if you can provide a minimal test application or integration test that reproduces this issue - that would be even better :)
Hi team, version 0.5.3.1
InternalRuntimeException: My test scenario is a scenario where 50% of records fail, and there's 1000 keys in total, parallelConsumer would run for a while and then exit due to InternalRuntimeException at 24/09/13 21:33:37.130 io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=79c3ac04-b8c7-4dc2-9b09-c77d6ad6bee4, requestedAtMs=1726234432425)
And we can see the code from ConsumerOffsetCommitter.commitAndWait()
CommitResponse take = commitResponseQueue.poll(commitTimeout.toMillis(), TimeUnit.MILLISECONDS); // blocks, drain until we find our response
cause take is null then throw InternalRuntimeException.metrics from pc_processed_records_total During this time, there's no successful records:
Adding commit response to queue: And below logs from ConsumerOffsetCommitter.maybeDoCommit() show the last time add commit response to queue is 24/09/13 21:16:54.105
Waiting on a commit response:
And we can see the code from ConsumerOffsetCommitter.commitAndWait() show the last time wait commit response from queue is 24/09/13 21:33:52.426
Here's my concerns: First time adding commit response to queue time is 24/09/13 21:16:54.105, and waiting on a commit response time is 24/09/13 21:16:54.084, within 30s second time there's no adding commit response to queue but waiting on a commit response time is 24/09/13 21:33:39.194. so it lead to InternalRuntimeException.
please help to check
BRS, Dumont