awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
644 stars 467 forks source link

Kinesis consumer cannot checkpoint because it cannot renew the lease #537

Open BobbyJohansen opened 5 years ago

BobbyJohansen commented 5 years ago

[shardId-000000000011] Worker 885e7e2d-c00e-4145-8f3d-b4fce1f1c46b refusing to update lease with key shardId-000000000011 because concurrency tokens don't match

Our consumer will sometimes fire this error out of the blue because it thinks it lost the lease and starts lagging behind our stream. I recently upgraded to

`software.amazon.kinesis

amazon-kinesis-client-pom 2.1.3` but still this error occurs. We get stuck in a process loop since we do not system.exit when a shutdown error occurs. Please help. ** Edit ** It looks like the lease coordinator does not mark shard record processors for shutdown when it cannot renew the lease due to this error. Which means that when I checkpoint in the software.amazon.kinesis.lifecycle.ProcessTask#callProcessRecords() where processRecords() is called to my code I end not being able to checkpoint and I throw a Java Error. Even more confusing is that software.amazon.kinesis.lifecycle.ProcessTask#callProcessRecords does not catch Error and cause a shutdown. I think that there should be an error type that I can bubble up to software.amazon.kinesis.lifecycle.ProcessTask#callProcessRecords that will cause the ShardRecordProcessor to perform a shutdown or the lease coordinator that gets the concurrency token exception should mark for shutdown. I am also confused around the difference between software.amazon.kinesis.processor.ShardRecordProcessor#shutdownRequested and software.amazon.kinesis.processor.ShutdownNotificationAware#shutdownRequested
BobbyJohansen commented 5 years ago

I have done some more analysis with KCL debug on here are the results:

I have reduced the logs to just shard 17 which is the one that had the problem occur. It looks like the leaseCounter was off by one in these logs. There are 16 applications running and balancing leases across the stream, it looks like in the screenshot provided that the other consumers thought the current lease leaseCounter was one higher than what the shard consumer consuming the shard thought it was. 

i.e when the shard consumer for shard 17 tried to prepare the checkpoint we get this error: DEBUG [ShardRecordProcessor-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher] [shardId-000000000017] Lease update failed for lease with key shardId-000000000017 because the lease counter was not 15505988

and a previous scan of the dynamo table revealed 20 milliseconds earlier that the lease looked like this (where the leaseCounter is 15505989) :
DEBUG [LeaseCoordinator-0000] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher] [] Got item {checkpoint=AttributeValue(S=49593949378240586316257100628612811999143984124999500050, SS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, NS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, BS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, M=software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap@657e54b1, L=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67), parentShardId=AttributeValue(SS=[shardId-000000000004], NS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, BS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, M=software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap@657e54b1, L=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67), leaseCounter=AttributeValue(N=15505989, SS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, NS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, BS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, M=software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap@657e54b1, L=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67), checkpointSubSequenceNumber=AttributeValue(N=0, SS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, NS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, BS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, M=software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap@657e54b1, L=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67), ownerSwitchesSinceCheckpoint=AttributeValue(N=0, SS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, NS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, BS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, M=software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap@657e54b1, L=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67), leaseKey=AttributeValue(S=shardId-000000000017, SS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, NS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, BS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, M=software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap@657e54b1, L=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67), leaseOwner=AttributeValue(S=089d4352-a291-4096-8ba2-859ff0f3011f, SS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, NS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, BS=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67, M=software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap@657e54b1, L=software.amazon.awssdk.core.util.DefaultSdkAutoConstructList@5a645f67)} from DynamoDB.

then we get the appropriate following messages from the shard 17 shard consumer:
2019-04-09 14:57:20,876 DEBUG [LeaseRenewer-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher] [] Renewing lease with key shardId-000000000017
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
307 04/09/2019
10:57:20.876 -0400  
2019-04-09 14:57:20,876 INFO  [ShardRecordProcessor-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRenewer] [shardId-000000000017] Worker 089d4352-a291-4096-8ba2-859ff0f3011f lost lease with key shardId-000000000017 - discovered during update

And I then millis later try to checkpoint on shard 17 and I get an error that it has lost the lease.

there is then an attempt to renew the lease:

10:57:20.987 -0400  
2019-04-09 14:57:20,987 INFO  [ShardRecordProcessor-0001] [software.amazon.kinesis.checkpoint.dynamodb.DynamoDBCheckpointer] [shardId-000000000017] Worker 089d4352-a291-4096-8ba2-859ff0f3011f could not prepare checkpoint for shard shardId-000000000017 because it does not hold the lease
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
294 04/09/2019
10:57:20.986 -0400  
2019-04-09 14:57:20,986 DEBUG [ShardRecordProcessor-0001] [software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer] [shardId-000000000017] Preparing checkpoint shardId-000000000017, token 38c15e35-0cb4-4774-ae51-a8581bb563a1 at specific extended sequence number {SequenceNumber: 49593949378240586316257100632041325623571072533185691922,SubsequenceNumber: 0}
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
295 04/09/2019
10:57:20.971 -0400  
2019-04-09 14:57:20,971 DEBUG [Thread-1] [software.amazon.kinesis.lifecycle.ShardConsumer] [] Previous PROCESS task still pending for shard shardId-000000000017 since PT0.092047S ago.  Not submitting new task.
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
296 04/09/2019
10:57:20.971 -0400  
2019-04-09 14:57:20,971 DEBUG [LeaseRenewer-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRenewer] [] Worker 089d4352-a291-4096-8ba2-859ff0f3011f successfully renewed lease with key shardId-000000000017
Exception in thread "Thread-1" java.lang.Error: [SHUTDOWN-KEY] Shutdown exception occurred
    at com.whoop.metrics.consumers.ingestion.MetricsIngestion.processRecords(MetricsIngestion.java:116)
    at software.amazon.kinesis.lifecycle.ProcessTask.callProcessRecords(ProcessTask.java:200)
    at software.amazon.kinesis.lifecycle.ProcessTask.call(ProcessTask.java:141)
    at software.amazon.kinesis.lifecycle.ShardConsumer.executeTask(ShardConsumer.java:298)
    at software.amazon.kinesis.lifecycle.ShardConsumer.processData(ShardConsumer.java:282)
    at software.amazon.kinesis.lifecycle.ShardConsumer.handleInput(ShardConsumer.java:124)
    at software.amazon.kinesis.lifecycle.ShardConsumerSubscriber.onNext(ShardConsumerSubscriber.java:145)
    at software.amazon.kinesis.lifecycle.ShardConsumerSubscriber.onNext(ShardConsumerSubscriber.java:35)
    at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
    at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
    at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:400)
    at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
    at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261)
    at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
...
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
297 04/09/2019
10:57:20.971 -0400  
2019-04-09 14:57:20,971 INFO  [LeaseRenewer-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher] [] Detected spurious renewal failure for lease with key shardId-000000000017, but recovered
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
298 04/09/2019
10:57:20.970 -0400  
2019-04-09 14:57:20,970 DEBUG [LeaseRenewer-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher] [] Got lease Lease(leaseKey=shardId-000000000017, leaseOwner=089d4352-a291-4096-8ba2-859ff0f3011f, leaseCounter=15505989, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49593949378240586316257100628612811999143984124999500050,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=0, parentShardIds=[shardId-000000000004])
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
299 04/09/2019
10:57:20.953 -0400  
2019-04-09 14:57:20,953 DEBUG [LeaseRenewer-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher] [] Getting lease with key shardId-000000000017
Host: production  Name: k8s_raw-metrics-ingestion_raw-metrics-ingestion-868fc77979-xnp2c_whoop_87cbc218-5a26-11e9-8a3c-065af4310944_0  Category: App/Ingestion 
300 04/09/2019
10:57:20.953 -0400  
2019-04-09 14:57:20,953 DEBUG [LeaseRenewer-0001] [software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher] [] Lease renewal failed for lease with key shardId-000000000017 because the lease counter was not 15505988

And from there it is a continuous loop that does not allow me to checkpoint. I think there in lies the bug where the shard consumer is either 1) not shutdown and recreated with a lease subscription 2) the increment of the leaseCounter was incorrectly incremented (as I did not see any application say they were stealing a lease) 3) the shardConsumer prepare checkpoint is not utilizing the refreshed lease.