Open posac opened 7 years ago
+1
+1
I have also encountered this problem. It is annoying, as when it happens the worker cannot progress causing service downtime (it doesn't crash, but it stops working as expected). It may happen when instance of IRecordProcessor
checkpoints specific records (sequence numbers) instead of using IRecordProcessorCheckpointer.checkpoint()
.
Here is how it is exhibited in logs:
2018-07-20 22:00:20,469 DEBUG StreamsRecordProcessor> Processing 1 records
2018-07-20 22:00:20,480 INFO KafkaStreamsRecordProcessor> KafkaSendCompleted to 'devices-ingress' at 14:a4
2018-07-20 22:00:20,486 INFO SequenceNumberValidator> Validated sequence number 1434700000000005227153817 with shard id shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:25,464 INFO Worker> Current stream shard assignments: shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:25,464 INFO Worker> Sleeping ...
2018-07-20 22:00:35,466 INFO ProcessTask> Reached end of shard shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:36,466 INFO StreamsRecordProcessor> Shutting down with status TERMINATE
2018-07-20 22:00:36,471 INFO SequenceNumberValidator> Validated sequence number 1434700000000005227153817 with shard id shardId-00000001532110040782-92c71e5b
2018-07-20 22:00:36,471 ERROR ShutdownTask> Application exception.
java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-00000001532110040782-92c71e5b
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:113) [service.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [service.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) [service.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
at java.lang.Thread.run(Thread.java:844) [?:?]
Here is what happened:
RecordProcessorCheckpointer.sequenceNumberAtShardEnd
is null.IRecordProcessor.shutdown(ShutdownInput)
with ShutdownReason.TERMINATE
. I flush processing and commit the last record I know about. This is the record that was processed in 20th second. This is the last record in the shard. The RecordProcessorCheckpointer.advancePosition(ExtendedSequenceNumber)
sets checkpointToRecord
to SHARD_END
, but mentioned by @posac condition is false (this sequence number was already checkpointed) and the SHARD_END
is not checkpointed. The KCL throws IllegalArgumentException
and retries in next second. It gets repeated every second and application doesn't progress.It can be worked around by using IRecordProcessorCheckpointer.checkpoint()
in IRecordProcessor.shutdown()
, which will checkpoint RecordProcessorCheckpointer.largestPermittedCheckpointValue
(it is set to SHARD_END
once we are at shard end). If you are processing your records asynchronously, make sure that all records were processed before you commit using any of above workarounds.
Correction suggested by @posac will solve the problem.
Hi,
I have issue with checkpoints and ending shards. In my processor, after each processRecords call I am calling checkpoint with sequence number. Sequence number is also stored in processor field.
On shutdown I am also calling : if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { checkpoint(shutdownInput.getCheckpointer()); }
But after "Reached end of shard " I got exception:
java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-00000001505383995695-aa7bbe9c at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:107) ~[amazon-kinesis-client-1.8.1.jar:?]
I have debug your code and for me there is bug in : RecordProcessorCheckpointer.java in method advancePosition.
I believe that in condition :
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
should be :
if (checkpointToRecord != null && !checkpointToRecord.equals(lastCheckpointValue)) {
Otherwise I am not able to store SHARD_END - cause when processRecord calls checkpoint in checkpoint there is no sequenceNumberAtShardEnd yet, It is when it is called from shutdown but then mentioned condition is not satisfy -> lastCheckpointValue is equal to extendedSequenceNumber