awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
644 stars 467 forks source link

Lots of error messages like: ` Cancelling subscription, and marking self as failed` with `Invalid StartingSequenceNumber` since upgrading to 2.0 #391

Closed ryangardner closed 5 years ago

ryangardner commented 6 years ago
java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000234, while it was used in a call to a shard with shardId-000000000309 (Service: Kinesis, Status Code: 400, Request ID: eab0b990-9136-1aad-beb8-145829c423c5)
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$complete$0(MakeAsyncHttpRequestStage.java:200) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
Caused by: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000234, while it was used in a call to a shard with shardId-000000000309 (Service: Kinesis, Status Code: 400, Request ID: eab0b990-9136-1aad-beb8-145829c423c5)
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:104) ~[kinesis-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:64) ~[kinesis-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:255) ~[netty-nio-client-2.0.1.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.publishMessage(HandlerPublisher.java:362) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.flushBuffer(HandlerPublisher.java:304) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.receivedDemand(HandlerPublisher.java:258) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.access$200(HandlerPublisher.java:41) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher$ChannelSubscription$1.run(HandlerPublisher.java:452) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    ... 1 common frames omitted

In the last three hours, I've had these errors / warnings with the following combinations of encoded shardId's and the shard ID it claims it was used with:

(all shardId's are with the format shardId-000000000301 - so I'm omitting the first part)

Encoded Shard ID Used-with ShardID count
301 349 2507
190 205 6385
222 196 2589
302 238 2582
270 258 2503
236 198 2512
253 229 356
266 342 7336
234 309 4710
260 229 2634
333 278 2519
308 318 2669

My stream has not been resharded recently and has 80 shards.

Question: Is this something that happens normally as part of the KCL client behavior, or does this indicate something I should be concerned about? (And If it is considered to be normal behavior, can the logs be changed to something other than warn-level and the full stacktrace not be logged?)

If this is something I should be worried about - what might be causing this?

pfifer commented 6 years ago

That should not be normally happening. That indicates the sequence number being used isn't for the shard.

Is it consistently reporting the sequence is for the other shard e.g. 301 is always report 349?
Can you check the sequence numbers from your lease table. You can use the AWS CLI with something like this:

aws kinesis get-shard-iterator --stream-name <stream name> --shard-id <shard-id> --shard-iterator-type AFTER_SEQUENCE_NUMBER --starting-sequence-number <sequence number>

If it fails it means you have invalid sequence numbers in your lease table. If it doesn't fail it would indicate that somehow the KCL is getting sequence numbers mixed up internally.

ryangardner commented 6 years ago

If I run aws kinesis get-shard-iterator --stream-name <stream name> --shard-id <shard-id> --shard-iterator-type AFTER_SEQUENCE_NUMBER --starting-sequence-number <sequence number> it returns a shard iterator to me for those sequences.

It's consistent with the mismatch for a specific kinesis stream across multiple application restarts - but it's not consistent between environments. We have another 80-shard kinesis stream that is in a different aws account running the same code that has mismatches between

I dug through our log aggregation a bit more to identify the count per each unique ecs instance ID and also get the vpc IP address of the ECS host (though the IP address part shouldn't matter at all)

It shows that some of the time the lease complaining happens to just one specific instance of the application, but other times multiple instances will have that same complaint.

Here are the results from scanning over our logs for the past 7 days on two different AWS accounts using the same code with two similar kinesis streams (both 80 shards)

From our first environment / aws account

Encoded Shard ID Used in call with Host IP ECS Instance ID Count
190 205 ip-172-22-3-78 6a2e220011d4 11541
222 196 ip-172-22-1-23 fbf546269d26 161264
234 309 ip-172-22-3-219 2d0e9eaddadd 9869
236 198 ip-172-22-3-219 28975e005306 338416
245 234 ip-172-22-5-39 9b76adc85be9 4011
253 229 ip-172-22-1-23 a4aea1accd2c 17
253 229 ip-172-22-5-247 bc9c12864689 339
260 229 ip-172-22-1-167 04455b78fa22 297165
266 342 ip-172-22-1-23 a4aea1accd2c 926
266 342 ip-172-22-5-39 9b76adc85be9 3974
266 342 ip-172-22-5-194 a40d444baf06 9806
266 342 ip-172-22-1-23 8a00e48f440a 11570
266 342 ip-172-22-1-23 0cbb3b128a6b 64131
270 258 ip-172-22-3-219 310d05603251 50207
301 349 ip-172-22-3-78 24ee540d4235 333078
302 238 ip-172-22-5-194 a40d444baf06 119851
308 318 ip-172-22-5-247 2eb66729d2cd 301885
333 278 ip-172-22-3-219 28975e005306 338421

From our second environment / aws account

Encoded Shard ID Used in call with Host IP ECS Instance ID Count
177 281 ip-172-22-3-59 fa58468448f0 46438
200 302 ip-172-22-3-182 774623fa7d60 465526
302 192 ip-172-22-3-154 235ee0e3676b 30105
298 264 ip-172-22-1-13 e4600c447ba4 190447
288 242 ip-172-22-1-205 5f4c61573241 196259
288 242 ip-172-22-1-172 04c509806a3f 11826
280 281 ip-172-22-5-180 e9623dbadb82 405401

One setting I have enabled in our scheduler is:

configsBuilder.coordinatorConfig()
                        .skipShardSyncAtWorkerInitializationIfLeasesExist(true)

I can turn that off if you think it might be involved - though it seems like that should only be involved at startup

pfifer commented 6 years ago

You're not using the same application name for different streams?

ryangardner commented 6 years ago

No, we have only one application using the KCL and it's only reading from the one stream. (Eventually we may add more applications processing this stream)

We have this same set up running in two different accounts - and there is a different application name (and kinesis stream) used on each of the aws accounts running this.

pfifer commented 6 years ago

Skip shard sync won't cause this sort of issue. What version of Java are you using? The stack traces look like a newer JVM. A newer version of Java shouldn't cause something like this, but it's at least one dimension.

Does the same behavior happen if you run the 1.9.1 version of the KCL against the stream?

ryangardner commented 6 years ago

We're using Java 1.10. I could try to swap it out with 1.9 or 1.8 if that would help.

We saw some issues in 1.9.1 where it seemed like the client binary was crashing / restarting at odd points which is part of what motivated us to go to 2.0 - the error messages we saw on 1.9.1 were more along the lines of:

Caught shutdown exception, skipping checkpoint. com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard

but they weren't nearly as frequent. I think there were some other errors in our logs for clients prior to 2.0 about how the kcl processor was shutting down. I'll have to dig around more to find the specific log messages

pfifer commented 6 years ago

If trying out Java 1.8 is easy that would be the best. I don't know why Java 1.10 would be causing issues, and I can look at testing it in the future.

The error you're seeing can occur on 2.0 as well. It means the worker/scheduler no longer holds the lease. The 1.x version of the KCL unfortunately overloaded the shutdown(...) method with two different possible entry points:

This was confusing so 2.0 broke it out to two different methods:

ryangardner commented 6 years ago

An update on this issue. We haven't had a chance to deploy out with java 1.8 yet, but yesterday we did reshard one of the streams. The error was gone this morning, but looking at the logs the error went away around six or seven hours after we re-sharded so it seems unlikely resharding it was what made the error go away.

I checked the logs on our other account where we did not reshard it, and on that account the error also went away without us doing any changes on our side. One the account that we didn't reshard, it went away a few hours before it went away on the other account.

Maybe something changed on the AWS side with regard to those streams? We certainly didn't change anything on our side yesterday.

ryangardner commented 6 years ago

Another update: I re-sharded that account that we didn't reshard yesterday and then the errors came back on that account but not on the other one.

We're investigating some issues with messages not getting through our pipeline and this particular issue is our prime suspect.

Looking at the shard-level metrics in cloudwatch, the shard that gets complained about ("It encodes X, while it was used in a call to a shard with Y") - shard Y shows it has incoming bytes but no outgoing bytes.

Every other shard has both incoming and outgoing bytes.

At the point this error has happened, what mitigation strategy do we have? I'm assuming it wouldn't have advanced any checkpointing forward - if we kill / restart any instance that shows those errors would another one get that lease and read those records?

pfifer commented 6 years ago

Are there any messages emitted from software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher? There should be warning messages about the exception.

athielen2 commented 6 years ago

Seeing the same thing in our application since updating to use KCL 2.0. We have multiple shards that have data coming in, but none coming out. When we redeploy to our nodes, and the application is restarted, some of those shards will begin reading again from the last checkpoint. However, the issue will often reappear on different shards.

Here is a graph showing one of the shards not reading:

screen shot 2018-09-11 at 11 29 52 am

Below is the error we are seeing from the FanOutRecordsPublisher:


java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000027, while it was used in a call to a shard with shardId-000000000008 (Service: Kinesis, Status Code: 400, Request ID: f6ab9909-d399-21e0-a2ba-74dd3fafa6ff)
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_181]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121) ~[measurement-snapshot-captor-stream-reader.jar:?]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_181]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_181]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$complete$0(MakeAsyncHttpRequestStage.java:200) ~[measurement-snapshot-captor-stream-reader.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000027, while it was used in a call to a shard with shardId-000000000008 (Service: Kinesis, Status Code: 400, Request ID: f6ab9909-d399-21e0-a2ba-74dd3fafa6ff)
at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:104) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:64) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122) ~[measurement-snapshot-captor-stream-reader.jar:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:255) ~[measurement-snapshot-captor-stream-reader.jar:?]
at com.typesafe.netty.HandlerPublisher.publishMessage(HandlerPublisher.java:362) ~[measurement-snapshot-captor-stream-reader.jar:?]
at com.typesafe.netty.HandlerPublisher.flushBuffer(HandlerPublisher.java:304) ~[measurement-snapshot-captor-stream-reader.jar:?]
at com.typesafe.netty.HandlerPublisher.receivedDemand(HandlerPublisher.java:258) ~[measurement-snapshot-captor-stream-reader.jar:?]
at com.typesafe.netty.HandlerPublisher.access$200(HandlerPublisher.java:41) ~[measurement-snapshot-captor-stream-reader.jar:?]
at com.typesafe.netty.HandlerPublisher$ChannelSubscription$1.run(HandlerPublisher.java:452) ~[measurement-snapshot-captor-stream-reader.jar:?]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[measurement-snapshot-captor-stream-reader.jar:?]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[measurement-snapshot-captor-stream-reader.jar:?]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464) ~[measurement-snapshot-captor-stream-reader.jar:?]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[measurement-snapshot-captor-stream-reader.jar:?]
... 1 more```
pfifer commented 6 years ago

Just opened a PR that will add some logging around the initialization phase. I will merge the PR in a bit, if possible could you try out the new version.

If you do you need to enable debug logging for the following classes:

This will log the sequence number returned from DynamoDB, handled by the InitializeTask, and provided to the FanOutRecordsPublisher for initialization.

ryangardner commented 6 years ago

My short term workaround was going to be adding a System.exit() in my shard consumer when that error pops up - but I can push out a version with this logging turned on to get that information to you.

pfifer commented 6 years ago

Are you comfortable building from the source or do you need a release to be available in Central?

ryangardner commented 6 years ago

I can build from source and put it into my internal artifact server, no big deal

pfifer commented 6 years ago

The change is now in master. If you can build it. You will need to enable debug logging for the classes I listed above. This should log the sequence number retrieved from DynamoDB, and used to initialize the FanOutRecordsPublisher. It would also be beneficial if you have thread id's in the logs.

You should see something like:

2018-09-11 11:10:29,974 [ShardRecordProcessor-0000] DEBUG s.a.k.c.d.DynamoDBCheckpointer [NONE] [NONE] - [shardId-000000000000] Retrieved lease => Lease(leaseKey=shardId-000000000000, leaseOwner=d25ed3d6-7f0f-4b4a-9ca7-1bbb90430afc, leaseCounter=146, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49587497311274533994574834252742144236107130636007899138,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[]) 
2018-09-11 11:10:29,975 [ShardRecordProcessor-0000] DEBUG s.a.kinesis.lifecycle.InitializeTask [NONE] [NONE] - [shardId-000000000000]: Checkpoint: {SequenceNumber: 49587497311274533994574834252742144236107130636007899138,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null) 
2018-09-11 11:10:29,975 [ShardRecordProcessor-0000] DEBUG s.a.k.r.f.FanOutRecordsPublisher [NONE] [NONE] - [shardId-000000000000] Initializing Publisher @ Sequence: {SequenceNumber: 49587497311274533994574834252742144236107130636007899138,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null) 

When the lease is first acquired. If the problem is occurring at that point you should get the error messages shortly afterwards.

Thanks for your help in tracking this down.

athielen2 commented 6 years ago

@pfifer would it be possible to get this available through Central?

athielen2 commented 6 years ago

Don't know if this is helpful or not, but we also noticed that for each shard that is not reading, the ownerSwitchesSinceCheckpoint column in the checkpoint table is non-zero:

screen shot 2018-09-11 at 4 49 49 pm

pfifer commented 6 years ago

We normally don't publish snapshots to Central, but I can investigate doing so tomorrow. Otherwise it will need to be a full release, and I would take a little bit more time.

ryangardner commented 6 years ago

Here are some logs:

(Note that the stacktraces might be in a weird order - I exported the logs from my log aggregation tool and it puts them in reverse chronological order - so I used a tail -r on the exported logs to get them back in normal chronological order but that probably did strange things to multi-line messages like stack traces.)


2018-09-11 22:27:39.506  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Sleeping ...
2018-09-11 22:28:23.665  INFO 7 --- [oordinator-0000] s.a.k.l.dynamodb.DynamoDBLeaseTaker      : Worker fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc saw 162 total leases, 52 available leases, 10 workers. Target is 17 leases, I have 0 leases, I will take 17 leases
2018-09-11 22:28:23.807  INFO 7 --- [oordinator-0000] s.a.k.l.dynamodb.DynamoDBLeaseTaker      : Worker fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc successfully took 14 leases: shardId-000000000427, shardId-000000000436, shardId-000000000401, shardId-000000000488, shardId-000000000498, shardId-000000000377, shardId-000000000497, shardId-000000000374, shardId-000000000363, shardId-000000000450, shardId-000000000384, shardId-000000000492, shardId-000000000490, shardId-000000000507
2018-09-11 22:28:23.807  INFO 7 --- [oordinator-0000] s.a.k.l.dynamodb.DynamoDBLeaseTaker      : Worker fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc failed to take 3 leases: shardId-000000000448, shardId-000000000359, shardId-000000000462
2018-09-11 22:28:24.774  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000427, concurrencyToken=e0e36f55-e9bf-4175-80a4-bc10af24e7d0, parentShardIds=[shardId-000000000269], checkpoint={SequenceNumber: 49588160275566154496829720989890449304599334072021424818,SubsequenceNumber: 0})
2018-09-11 22:28:24.776  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000436, concurrencyToken=6bfc7a02-85dc-4aeb-bfaa-78bee1f1f612, parentShardIds=[shardId-000000000276], checkpoint={SequenceNumber: 49588160294677893131970461433826345716085141884089932610,SubsequenceNumber: 0})
2018-09-11 22:28:24.777  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000401, concurrencyToken=b695f3bb-1229-413c-ab29-fb1579322db5, parentShardIds=[shardId-000000000242], checkpoint={SequenceNumber: 49588160223248606261076875455806734688981848970781268242,SubsequenceNumber: 0})
2018-09-11 22:28:24.779  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000488, concurrencyToken=f1dc693a-be84-4252-848e-0d083d3646a9, parentShardIds=[shardId-000000000330], checkpoint={SequenceNumber: 49588160296506554238249968084872546083315933702822828898,SubsequenceNumber: 0})
2018-09-11 22:28:24.780  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000377, concurrencyToken=2521875e-7ca7-4f20-84a8-98c3981dbb5b, parentShardIds=[shardId-000000000218], checkpoint={SequenceNumber: 49588160182750452980545263886462202483121859579256772498,SubsequenceNumber: 0})
2018-09-11 22:28:24.780  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000498, concurrencyToken=31f7976f-c846-4181-83a3-b0a81963d9fa, parentShardIds=[shardId-000000000341], checkpoint={SequenceNumber: 49588160412024414366638603992952345118213116918692323106,SubsequenceNumber: 0})
2018-09-11 22:28:24.783  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000497, concurrencyToken=02c65d17-682d-4f7c-a172-2ebd9466cab8, parentShardIds=[shardId-000000000338], checkpoint={SequenceNumber: 49588160410574865928734109925154736845483584532875583250,SubsequenceNumber: 0})
2018-09-11 22:28:24.783  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000363, concurrencyToken=c1a4e7fc-55d7-4d78-9ccf-56a4bb4a60c1, parentShardIds=[shardId-000000000205], checkpoint={SequenceNumber: 49588160153179664847293657824269075993370095812807235250,SubsequenceNumber: 0})
2018-09-11 22:28:24.784  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000374, concurrencyToken=0020a1f2-bdf8-4c5a-84c0-8eb0c84ca02d, parentShardIds=[shardId-000000000214], checkpoint={SequenceNumber: 49588160179829055359537752244981235300533442529063868258,SubsequenceNumber: 0})
2018-09-11 22:28:24.784  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000384, concurrencyToken=321aedaa-310a-4f9b-8a87-9f24e6079425, parentShardIds=[shardId-000000000226], checkpoint={SequenceNumber: 49588160198249470893524046979762061216566254160974977026,SubsequenceNumber: 0})
2018-09-11 22:28:24.785  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000450, concurrencyToken=b96eb115-c179-4803-87a5-2bdc5cbb6c75, parentShardIds=[shardId-000000000293], checkpoint={SequenceNumber: 49588160317469254724868758295573788814794062754687753250,SubsequenceNumber: 0})
2018-09-11 22:28:24.788  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000492, concurrencyToken=ccbd9153-f1a2-4f73-9806-abb5d5ca7561, parentShardIds=[shardId-000000000332], checkpoint={SequenceNumber: 49588160406181619124623577364989381944926527427488784066,SubsequenceNumber: 0})
2018-09-11 22:28:24.789  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000490, concurrencyToken=fa5f73e5-4174-463e-a354-87c568510674, parentShardIds=[shardId-000000000333], checkpoint={SequenceNumber: 49588160394719036092578829266126988628492334706561392290,SubsequenceNumber: 0})
2018-09-11 22:28:24.790  INFO 7 --- [      Thread-22] s.amazon.kinesis.coordinator.Scheduler   : Created new shardConsumer for : ShardInfo(shardId=shardId-000000000507, concurrencyToken=abc794bc-9a09-494b-b78e-1371f601ee05, parentShardIds=[shardId-000000000349], checkpoint={SequenceNumber: 49588160429708905309073384494638966274635349735443013554,SubsequenceNumber: 0})
2018-09-11 22:28:24.820  INFO 7 --- [dProcessor-0008] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000214. Not blocking on completion of this shard.
2018-09-11 22:28:24.821  INFO 7 --- [dProcessor-0008] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000214] of shard shardId-000000000374
2018-09-11 22:28:24.842  INFO 7 --- [dProcessor-0002] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000242. Not blocking on completion of this shard.
2018-09-11 22:28:24.842  INFO 7 --- [dProcessor-0002] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000242] of shard shardId-000000000401
2018-09-11 22:28:24.848  INFO 7 --- [dProcessor-0005] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000341. Not blocking on completion of this shard.
2018-09-11 22:28:24.848  INFO 7 --- [dProcessor-0005] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000341] of shard shardId-000000000498
2018-09-11 22:28:24.854  INFO 7 --- [dProcessor-0010] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000293. Not blocking on completion of this shard.
2018-09-11 22:28:24.854  INFO 7 --- [dProcessor-0010] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000293] of shard shardId-000000000450
2018-09-11 22:28:24.858  INFO 7 --- [dProcessor-0001] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000276. Not blocking on completion of this shard.
2018-09-11 22:28:24.858  INFO 7 --- [dProcessor-0001] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000276] of shard shardId-000000000436
2018-09-11 22:28:24.860  INFO 7 --- [dProcessor-0006] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000338. Not blocking on completion of this shard.
2018-09-11 22:28:24.860  INFO 7 --- [dProcessor-0006] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000338] of shard shardId-000000000497
2018-09-11 22:28:24.860  INFO 7 --- [dProcessor-0000] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000269. Not blocking on completion of this shard.
2018-09-11 22:28:24.861  INFO 7 --- [dProcessor-0000] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000269] of shard shardId-000000000427
2018-09-11 22:28:24.861  INFO 7 --- [dProcessor-0009] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000226. Not blocking on completion of this shard.
2018-09-11 22:28:24.861  INFO 7 --- [dProcessor-0009] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000226] of shard shardId-000000000384
2018-09-11 22:28:24.862  INFO 7 --- [dProcessor-0004] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000218. Not blocking on completion of this shard.
2018-09-11 22:28:24.862  INFO 7 --- [dProcessor-0004] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000218] of shard shardId-000000000377
2018-09-11 22:28:24.863  INFO 7 --- [dProcessor-0007] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000205. Not blocking on completion of this shard.
2018-09-11 22:28:24.863  INFO 7 --- [dProcessor-0007] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000205] of shard shardId-000000000363
2018-09-11 22:28:24.867  INFO 7 --- [dProcessor-0011] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000332. Not blocking on completion of this shard.
2018-09-11 22:28:24.867  INFO 7 --- [dProcessor-0011] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000332] of shard shardId-000000000492
2018-09-11 22:28:24.868  INFO 7 --- [dProcessor-0012] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000333. Not blocking on completion of this shard.
2018-09-11 22:28:24.868  INFO 7 --- [dProcessor-0012] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000333] of shard shardId-000000000490
2018-09-11 22:28:24.873  INFO 7 --- [dProcessor-0003] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000330. Not blocking on completion of this shard.
2018-09-11 22:28:24.873  INFO 7 --- [dProcessor-0003] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000330] of shard shardId-000000000488
2018-09-11 22:28:24.905  INFO 7 --- [dProcessor-0013] s.a.k.lifecycle.BlockOnParentShardTask   : No lease found for shard shardId-000000000349. Not blocking on completion of this shard.
2018-09-11 22:28:24.905  INFO 7 --- [dProcessor-0013] s.a.k.lifecycle.BlockOnParentShardTask   : No need to block on parents [shardId-000000000349] of shard shardId-000000000507
2018-09-11 22:28:25.794 DEBUG 7 --- [dProcessor-0013] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000427, concurrencyToken=e0e36f55-e9bf-4175-80a4-bc10af24e7d0, parentShardIds=[shardId-000000000269], checkpoint={SequenceNumber: 49588160275566154496829720989890449304599334072021424818,SubsequenceNumber: 0})
2018-09-11 22:28:25.794 DEBUG 7 --- [dProcessor-0010] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000450, concurrencyToken=b96eb115-c179-4803-87a5-2bdc5cbb6c75, parentShardIds=[shardId-000000000293], checkpoint={SequenceNumber: 49588160317469254724868758295573788814794062754687753250,SubsequenceNumber: 0})
2018-09-11 22:28:25.794 DEBUG 7 --- [dProcessor-0008] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000507, concurrencyToken=abc794bc-9a09-494b-b78e-1371f601ee05, parentShardIds=[shardId-000000000349], checkpoint={SequenceNumber: 49588160429708905309073384494638966274635349735443013554,SubsequenceNumber: 0})
2018-09-11 22:28:25.794 DEBUG 7 --- [dProcessor-0002] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000490, concurrencyToken=fa5f73e5-4174-463e-a354-87c568510674, parentShardIds=[shardId-000000000333], checkpoint={SequenceNumber: 49588160394719036092578829266126988628492334706561392290,SubsequenceNumber: 0})
2018-09-11 22:28:25.794 DEBUG 7 --- [dProcessor-0003] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000436, concurrencyToken=6bfc7a02-85dc-4aeb-bfaa-78bee1f1f612, parentShardIds=[shardId-000000000276], checkpoint={SequenceNumber: 49588160294677893131970461433826345716085141884089932610,SubsequenceNumber: 0})
2018-09-11 22:28:25.794 DEBUG 7 --- [dProcessor-0009] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000497, concurrencyToken=02c65d17-682d-4f7c-a172-2ebd9466cab8, parentShardIds=[shardId-000000000338], checkpoint={SequenceNumber: 49588160410574865928734109925154736845483584532875583250,SubsequenceNumber: 0})
2018-09-11 22:28:25.795 DEBUG 7 --- [dProcessor-0001] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000384, concurrencyToken=321aedaa-310a-4f9b-8a87-9f24e6079425, parentShardIds=[shardId-000000000226], checkpoint={SequenceNumber: 49588160198249470893524046979762061216566254160974977026,SubsequenceNumber: 0})
2018-09-11 22:28:25.795 DEBUG 7 --- [dProcessor-0000] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000363, concurrencyToken=c1a4e7fc-55d7-4d78-9ccf-56a4bb4a60c1, parentShardIds=[shardId-000000000205], checkpoint={SequenceNumber: 49588160153179664847293657824269075993370095812807235250,SubsequenceNumber: 0})
2018-09-11 22:28:25.795 DEBUG 7 --- [dProcessor-0005] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000492, concurrencyToken=ccbd9153-f1a2-4f73-9806-abb5d5ca7561, parentShardIds=[shardId-000000000332], checkpoint={SequenceNumber: 49588160406181619124623577364989381944926527427488784066,SubsequenceNumber: 0})
2018-09-11 22:28:25.795 DEBUG 7 --- [dProcessor-0007] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000377, concurrencyToken=2521875e-7ca7-4f20-84a8-98c3981dbb5b, parentShardIds=[shardId-000000000218], checkpoint={SequenceNumber: 49588160182750452980545263886462202483121859579256772498,SubsequenceNumber: 0})
2018-09-11 22:28:25.795 DEBUG 7 --- [dProcessor-0004] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000498, concurrencyToken=31f7976f-c846-4181-83a3-b0a81963d9fa, parentShardIds=[shardId-000000000341], checkpoint={SequenceNumber: 49588160412024414366638603992952345118213116918692323106,SubsequenceNumber: 0})
2018-09-11 22:28:25.795 DEBUG 7 --- [dProcessor-0011] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000488, concurrencyToken=f1dc693a-be84-4252-848e-0d083d3646a9, parentShardIds=[shardId-000000000330], checkpoint={SequenceNumber: 49588160296506554238249968084872546083315933702822828898,SubsequenceNumber: 0})
2018-09-11 22:28:25.795 DEBUG 7 --- [dProcessor-0006] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000374, concurrencyToken=0020a1f2-bdf8-4c5a-84c0-8eb0c84ca02d, parentShardIds=[shardId-000000000214], checkpoint={SequenceNumber: 49588160179829055359537752244981235300533442529063868258,SubsequenceNumber: 0})
2018-09-11 22:28:25.796 DEBUG 7 --- [dProcessor-0012] s.a.kinesis.lifecycle.InitializeTask     : Initializing ShardId ShardInfo(shardId=shardId-000000000401, concurrencyToken=b695f3bb-1229-413c-ab29-fb1579322db5, parentShardIds=[shardId-000000000242], checkpoint={SequenceNumber: 49588160223248606261076875455806734688981848970781268242,SubsequenceNumber: 0})
2018-09-11 22:28:25.804 DEBUG 7 --- [dProcessor-0000] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000363] Retrieved lease => Lease(leaseKey=shardId-000000000363, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=535, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160153179664847293657824269075993370095812807235250,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000205])
2018-09-11 22:28:25.804 DEBUG 7 --- [dProcessor-0003] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000436] Retrieved lease => Lease(leaseKey=shardId-000000000436, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=537, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160294677893131970461433826345716085141884089932610,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000276])
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0000] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000363]: Checkpoint: {SequenceNumber: 49588160153179664847293657824269075993370095812807235250,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0003] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000436]: Checkpoint: {SequenceNumber: 49588160294677893131970461433826345716085141884089932610,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0003] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000436] Initializing Publisher @ Sequence: {SequenceNumber: 49588160294677893131970461433826345716085141884089932610,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0011] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000488] Retrieved lease => Lease(leaseKey=shardId-000000000488, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=421, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160296506554238249968084872546083315933702822828898,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=5, parentShardIds=[shardId-000000000330])
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0003] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0000] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000363] Initializing Publisher @ Sequence: {SequenceNumber: 49588160153179664847293657824269075993370095812807235250,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0002] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000490] Retrieved lease => Lease(leaseKey=shardId-000000000490, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=438, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160394719036092578829266126988628492334706561392290,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000333])
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0000] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.805 DEBUG 7 --- [dProcessor-0011] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000488]: Checkpoint: {SequenceNumber: 49588160296506554238249968084872546083315933702822828898,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0002] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000490]: Checkpoint: {SequenceNumber: 49588160394719036092578829266126988628492334706561392290,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0002] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000490] Initializing Publisher @ Sequence: {SequenceNumber: 49588160394719036092578829266126988628492334706561392290,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0002] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0009] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000497] Retrieved lease => Lease(leaseKey=shardId-000000000497, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=555, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160410574865928734109925154736845483584532875583250,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000338])
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0008] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000507] Retrieved lease => Lease(leaseKey=shardId-000000000507, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=539, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160429708905309073384494638966274635349735443013554,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000349])
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0009] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000497]: Checkpoint: {SequenceNumber: 49588160410574865928734109925154736845483584532875583250,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0009] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000497] Initializing Publisher @ Sequence: {SequenceNumber: 49588160410574865928734109925154736845483584532875583250,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0008] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000507]: Checkpoint: {SequenceNumber: 49588160429708905309073384494638966274635349735443013554,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0009] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0008] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000507] Initializing Publisher @ Sequence: {SequenceNumber: 49588160429708905309073384494638966274635349735443013554,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0008] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0013] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000427] Retrieved lease => Lease(leaseKey=shardId-000000000427, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=515, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160275566154496829720989890449304599334072021424818,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000269])
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0012] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000401] Retrieved lease => Lease(leaseKey=shardId-000000000401, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=529, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160223248606261076875455806734688981848970781268242,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000242])
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0013] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000427]: Checkpoint: {SequenceNumber: 49588160275566154496829720989890449304599334072021424818,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0012] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000401]: Checkpoint: {SequenceNumber: 49588160223248606261076875455806734688981848970781268242,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0013] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000427] Initializing Publisher @ Sequence: {SequenceNumber: 49588160275566154496829720989890449304599334072021424818,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.806 DEBUG 7 --- [dProcessor-0012] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000401] Initializing Publisher @ Sequence: {SequenceNumber: 49588160223248606261076875455806734688981848970781268242,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0012] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0011] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000488] Initializing Publisher @ Sequence: {SequenceNumber: 49588160296506554238249968084872546083315933702822828898,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0013] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0011] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0006] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000374] Retrieved lease => Lease(leaseKey=shardId-000000000374, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=587, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160179829055359537752244981235300533442529063868258,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000214])
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0004] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000498] Retrieved lease => Lease(leaseKey=shardId-000000000498, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=520, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160412024414366638603992952345118213116918692323106,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000341])
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0006] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000374]: Checkpoint: {SequenceNumber: 49588160179829055359537752244981235300533442529063868258,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0007] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000377] Retrieved lease => Lease(leaseKey=shardId-000000000377, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=537, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160182750452980545263886462202483121859579256772498,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000218])
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0006] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000374] Initializing Publisher @ Sequence: {SequenceNumber: 49588160179829055359537752244981235300533442529063868258,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0007] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000377]: Checkpoint: {SequenceNumber: 49588160182750452980545263886462202483121859579256772498,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0004] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000498]: Checkpoint: {SequenceNumber: 49588160412024414366638603992952345118213116918692323106,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0006] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0004] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000498] Initializing Publisher @ Sequence: {SequenceNumber: 49588160412024414366638603992952345118213116918692323106,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0007] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000377] Initializing Publisher @ Sequence: {SequenceNumber: 49588160182750452980545263886462202483121859579256772498,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0007] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0004] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.807  INFO 7 --- [dProcessor-0007] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000377, extended sequence number: {SequenceNumber: 49588160182750452980545263886462202483121859579256772498,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.807  INFO 7 --- [dProcessor-0006] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000374, extended sequence number: {SequenceNumber: 49588160179829055359537752244981235300533442529063868258,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.807  INFO 7 --- [dProcessor-0003] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000436, extended sequence number: {SequenceNumber: 49588160294677893131970461433826345716085141884089932610,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0007] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0006] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.807 DEBUG 7 --- [dProcessor-0003] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.807  INFO 7 --- [dProcessor-0004] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000498, extended sequence number: {SequenceNumber: 49588160412024414366638603992952345118213116918692323106,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0001] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000384] Retrieved lease => Lease(leaseKey=shardId-000000000384, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=525, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160198249470893524046979762061216566254160974977026,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000226])
2018-09-11 22:28:25.808  INFO 7 --- [dProcessor-0002] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000490, extended sequence number: {SequenceNumber: 49588160394719036092578829266126988628492334706561392290,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0002] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.808  INFO 7 --- [dProcessor-0012] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000401, extended sequence number: {SequenceNumber: 49588160223248606261076875455806734688981848970781268242,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808  INFO 7 --- [dProcessor-0009] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000497, extended sequence number: {SequenceNumber: 49588160410574865928734109925154736845483584532875583250,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0012] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.808  INFO 7 --- [dProcessor-0013] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000427, extended sequence number: {SequenceNumber: 49588160275566154496829720989890449304599334072021424818,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0013] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.808  INFO 7 --- [dProcessor-0000] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000363, extended sequence number: {SequenceNumber: 49588160153179664847293657824269075993370095812807235250,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0000] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0001] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000384]: Checkpoint: {SequenceNumber: 49588160198249470893524046979762061216566254160974977026,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0010] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000450] Retrieved lease => Lease(leaseKey=shardId-000000000450, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=543, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160317469254724868758295573788814794062754687753250,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000293])
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0001] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000384] Initializing Publisher @ Sequence: {SequenceNumber: 49588160198249470893524046979762061216566254160974977026,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0001] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0009] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.808  INFO 7 --- [dProcessor-0008] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000507, extended sequence number: {SequenceNumber: 49588160429708905309073384494638966274635349735443013554,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808  INFO 7 --- [dProcessor-0011] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000488, extended sequence number: {SequenceNumber: 49588160296506554238249968084872546083315933702822828898,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.808 DEBUG 7 --- [dProcessor-0010] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000450]: Checkpoint: {SequenceNumber: 49588160317469254724868758295573788814794062754687753250,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0004] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0008] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0010] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000450] Initializing Publisher @ Sequence: {SequenceNumber: 49588160317469254724868758295573788814794062754687753250,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0010] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.809  INFO 7 --- [dProcessor-0001] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000384, extended sequence number: {SequenceNumber: 49588160198249470893524046979762061216566254160974977026,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.809  INFO 7 --- [dProcessor-0010] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000450, extended sequence number: {SequenceNumber: 49588160317469254724868758295573788814794062754687753250,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0001] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0010] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0005] s.a.k.c.dynamodb.DynamoDBCheckpointer    : [shardId-000000000492] Retrieved lease => Lease(leaseKey=shardId-000000000492, leaseOwner=fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc, leaseCounter=521, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: 49588160406181619124623577364989381944926527427488784066,SubsequenceNumber: 0}, pendingCheckpoint=null, ownerSwitchesSinceCheckpoint=1, parentShardIds=[shardId-000000000332])
2018-09-11 22:28:25.809 DEBUG 7 --- [dProcessor-0005] s.a.kinesis.lifecycle.InitializeTask     : [shardId-000000000492]: Checkpoint: {SequenceNumber: 49588160406181619124623577364989381944926527427488784066,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.810 DEBUG 7 --- [dProcessor-0005] s.a.k.r.fanout.FanOutRecordsPublisher    : [shardId-000000000492] Initializing Publisher @ Sequence: {SequenceNumber: 49588160406181619124623577364989381944926527427488784066,SubsequenceNumber: 0} -- Initial Position: InitialPositionInStreamExtended(position=LATEST, timestamp=null)
2018-09-11 22:28:25.810 DEBUG 7 --- [dProcessor-0011] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:25.810 DEBUG 7 --- [dProcessor-0005] s.a.kinesis.lifecycle.InitializeTask     : Calling the record processor initialize().
2018-09-11 22:28:25.810  INFO 7 --- [dProcessor-0005] c.c.r.rdsloader.RdsProcessor             : InitializationInput received was shardId: shardId-000000000492, extended sequence number: {SequenceNumber: 49588160406181619124623577364989381944926527427488784066,SubsequenceNumber: 0}, pending checkpoint sequence number: null
2018-09-11 22:28:25.810 DEBUG 7 --- [dProcessor-0005] s.a.kinesis.lifecycle.InitializeTask     : Record processor initialize() completed.
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0001] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000436: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000436-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0012] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000401: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819250Z id: shardId-000000000401-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0004] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000498: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000498-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0009] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000497: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000497-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0005] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000488: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819355Z id: shardId-000000000488-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0007] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000490: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819277Z id: shardId-000000000490-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0011] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000427: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000427-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0013] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000374: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819318Z id: shardId-000000000374-1 -- Starting subscribe to shard
2018-09-11 22:28:27.819 DEBUG 7 --- [dProcessor-0003] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000492: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.819846Z id: shardId-000000000492-1 -- Starting subscribe to shard
2018-09-11 22:28:27.821 DEBUG 7 --- [dProcessor-0000] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000363: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.821576Z id: shardId-000000000363-1 -- Starting subscribe to shard
2018-09-11 22:28:27.821 DEBUG 7 --- [dProcessor-0014] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000507: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.821628Z id: shardId-000000000507-1 -- Starting subscribe to shard
2018-09-11 22:28:27.821 DEBUG 7 --- [dProcessor-0006] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000450: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.821688Z id: shardId-000000000450-1 -- Starting subscribe to shard
2018-09-11 22:28:27.821 DEBUG 7 --- [dProcessor-0002] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000384: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.821640Z id: shardId-000000000384-1 -- Starting subscribe to shard
2018-09-11 22:28:27.821 DEBUG 7 --- [dProcessor-0010] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000377: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:27.821710Z id: shardId-000000000377-1 -- Starting subscribe to shard
2018-09-11 22:28:27.865 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000507: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.821628Z id: shardId-000000000507-1 -- Subscribe
2018-09-11 22:28:27.865 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000507: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.821628Z id: shardId-000000000507-1 -- creating record subscription
2018-09-11 22:28:27.869 DEBUG 7 --- [tyEventLoop-0-1] anOutRecordsPublisher$RecordSubscription : shardId-000000000507: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.821628Z id: shardId-000000000507-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.880 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000401: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819250Z id: shardId-000000000401-1 -- Subscribe
2018-09-11 22:28:27.880 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000401: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819250Z id: shardId-000000000401-1 -- creating record subscription
2018-09-11 22:28:27.880 DEBUG 7 --- [tyEventLoop-0-0] anOutRecordsPublisher$RecordSubscription : shardId-000000000401: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819250Z id: shardId-000000000401-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.881 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000497: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819217Z id: shardId-000000000497-1 -- Subscribe
2018-09-11 22:28:27.881 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000497: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000497-1 -- creating record subscription
2018-09-11 22:28:27.881 DEBUG 7 --- [tyEventLoop-0-3] anOutRecordsPublisher$RecordSubscription : shardId-000000000497: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000497-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.883 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000427: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819217Z id: shardId-000000000427-1 -- Subscribe
2018-09-11 22:28:27.883 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000427: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000427-1 -- creating record subscription
2018-09-11 22:28:27.883 DEBUG 7 --- [tyEventLoop-0-2] anOutRecordsPublisher$RecordSubscription : shardId-000000000427: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000427-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.884 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000427: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000427-1 -- Response received
2018-09-11 22:28:27.884 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000507: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.821628Z id: shardId-000000000507-1 -- Response received
2018-09-11 22:28:27.884 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000497: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000497-1 -- Response received
2018-09-11 22:28:27.884 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000401: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819250Z id: shardId-000000000401-1 -- Response received
2018-09-11 22:28:27.885 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000498: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819217Z id: shardId-000000000498-1 -- Subscribe
2018-09-11 22:28:27.885 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000498: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000498-1 -- creating record subscription
2018-09-11 22:28:27.885 DEBUG 7 --- [tyEventLoop-0-0] anOutRecordsPublisher$RecordSubscription : shardId-000000000498: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000498-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.885 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000498: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000498-1 -- Response received
2018-09-11 22:28:27.889 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000490: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819277Z id: shardId-000000000490-1 -- Subscribe
2018-09-11 22:28:27.890 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000490: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819277Z id: shardId-000000000490-1 -- creating record subscription
2018-09-11 22:28:27.890 DEBUG 7 --- [tyEventLoop-0-0] anOutRecordsPublisher$RecordSubscription : shardId-000000000490: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819277Z id: shardId-000000000490-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.890 DEBUG 7 --- [tyEventLoop-0-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000490: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819277Z id: shardId-000000000490-1 -- Response received
2018-09-11 22:28:27.891 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000377: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.821710Z id: shardId-000000000377-1 -- Subscribe
2018-09-11 22:28:27.891 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000377: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.821710Z id: shardId-000000000377-1 -- creating record subscription
2018-09-11 22:28:27.891 DEBUG 7 --- [tyEventLoop-0-2] anOutRecordsPublisher$RecordSubscription : shardId-000000000377: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.821710Z id: shardId-000000000377-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.892 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000377: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.821710Z id: shardId-000000000377-1 -- Response received
2018-09-11 22:28:27.893 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000363: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.821576Z id: shardId-000000000363-1 -- Subscribe
2018-09-11 22:28:27.894 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000363: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.821576Z id: shardId-000000000363-1 -- creating record subscription
2018-09-11 22:28:27.894 DEBUG 7 --- [tyEventLoop-0-3] anOutRecordsPublisher$RecordSubscription : shardId-000000000363: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.821576Z id: shardId-000000000363-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.894 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000363: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.821576Z id: shardId-000000000363-1 -- Response received
2018-09-11 22:28:27.902 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000492: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819846Z id: shardId-000000000492-1 -- Subscribe
2018-09-11 22:28:27.902 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000492: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819846Z id: shardId-000000000492-1 -- creating record subscription
2018-09-11 22:28:27.903 DEBUG 7 --- [tyEventLoop-0-3] anOutRecordsPublisher$RecordSubscription : shardId-000000000492: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819846Z id: shardId-000000000492-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.903 DEBUG 7 --- [tyEventLoop-0-3] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000492: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819846Z id: shardId-000000000492-1 -- Response received
2018-09-11 22:28:27.911 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000374: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819318Z id: shardId-000000000374-1 -- Subscribe
2018-09-11 22:28:27.911 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000374: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819318Z id: shardId-000000000374-1 -- creating record subscription
2018-09-11 22:28:27.911 DEBUG 7 --- [tyEventLoop-0-1] anOutRecordsPublisher$RecordSubscription : shardId-000000000374: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819318Z id: shardId-000000000374-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.911 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000374: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819318Z id: shardId-000000000374-1 -- Response received
2018-09-11 22:28:27.916 DEBUG 7 --- [nc-response-1-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000488: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ 2018-09-11T22:28:27.819355Z id: shardId-000000000488-1 -- java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
    ... 1 common frames omitted
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:545) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1221) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1177) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1407) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:451) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:505) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:670) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.HandlerPublisher.handlerRemoved(HandlerPublisher.java:395) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.complete(HandlerPublisher.java:408) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:255) ~[netty-nio-client-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:64) ~[kinesis-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:104) ~[kinesis-2.0.1.jar!/:na]
Caused by: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
    at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$complete$0(MakeAsyncHttpRequestStage.java:200) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
2018-09-11 22:28:27.917  WARN 7 --- [nc-response-1-0] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000488: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ 2018-09-11T22:28:27.819355Z id: shardId-000000000488-1 -- CompletionException/software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
2018-09-11 22:28:27.918 DEBUG 7 --- [nc-response-1-0] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000488: availableQueueSpace zeroing from 8
    ... 1 common frames omitted
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:545) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1221) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1177) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1407) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:451) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:505) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:670) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.HandlerPublisher.handlerRemoved(HandlerPublisher.java:395) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.complete(HandlerPublisher.java:408) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:255) ~[netty-nio-client-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:64) ~[kinesis-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:104) ~[kinesis-2.0.1.jar!/:na]
Caused by: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$complete$0(MakeAsyncHttpRequestStage.java:200) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
2018-09-11 22:28:27.921  WARN 7 --- [dProcessor-0014] s.a.kinesis.lifecycle.ShardConsumer      : shardId-000000000488: onError().  Cancelling subscription, and marking self as failed.
2018-09-11 22:28:27.921 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000436: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.819217Z id: shardId-000000000436-1 -- Subscribe
2018-09-11 22:28:27.922 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000436: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000436-1 -- creating record subscription
2018-09-11 22:28:27.922 DEBUG 7 --- [tyEventLoop-0-1] anOutRecordsPublisher$RecordSubscription : shardId-000000000436: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000436-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.922 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000436: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.819217Z id: shardId-000000000436-1 -- Response received
2018-09-11 22:28:27.947 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000384: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.821640Z id: shardId-000000000384-1 -- Subscribe
2018-09-11 22:28:27.947 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000384: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.821640Z id: shardId-000000000384-1 -- creating record subscription
2018-09-11 22:28:27.947 DEBUG 7 --- [tyEventLoop-0-1] anOutRecordsPublisher$RecordSubscription : shardId-000000000384: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.821640Z id: shardId-000000000384-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:27.947 DEBUG 7 --- [tyEventLoop-0-1] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000384: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.821640Z id: shardId-000000000384-1 -- Response received
2018-09-11 22:28:28.063 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000450: [SubscriptionLifetime]: (RecordFlow#onEventStream)  @ 2018-09-11T22:28:27.821688Z id: shardId-000000000450-1 -- Subscribe
2018-09-11 22:28:28.063 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000450: [SubscriptionLifetime]: (RecordFlow#onEventStream) @ 2018-09-11T22:28:27.821688Z id: shardId-000000000450-1 -- creating record subscription
2018-09-11 22:28:28.064 DEBUG 7 --- [tyEventLoop-0-2] anOutRecordsPublisher$RecordSubscription : shardId-000000000450: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ 2018-09-11T22:28:27.821688Z id: shardId-000000000450-1 -- Outstanding: 8 items so requesting an item
2018-09-11 22:28:28.064 DEBUG 7 --- [tyEventLoop-0-2] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000450: [SubscriptionLifetime]: (RecordFlow#responseReceived) @ 2018-09-11T22:28:27.821688Z id: shardId-000000000450-1 -- Response received
2018-09-11 22:28:28.179  INFO 7 --- [dProcessor-0004] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.000084S ms in cache
2018-09-11 22:28:28.269  INFO 7 --- [dProcessor-0002] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.000726S ms in cache
2018-09-11 22:28:28.434  INFO 7 --- [dProcessor-0014] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.000706S ms in cache
2018-09-11 22:28:28.449  INFO 7 --- [dProcessor-0001] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.000076S ms in cache
2018-09-11 22:28:28.457  INFO 7 --- [dProcessor-0000] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.003505S ms in cache
2018-09-11 22:28:28.593  INFO 7 --- [dProcessor-0013] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.000119S ms in cache
2018-09-11 22:28:28.634  INFO 7 --- [dProcessor-0010] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.004575S ms in cache
2018-09-11 22:28:28.638  INFO 7 --- [dProcessor-0011] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.003528S ms in cache
2018-09-11 22:28:28.671  INFO 7 --- [dProcessor-0003] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.003506S ms in cache
2018-09-11 22:28:28.671  INFO 7 --- [dProcessor-0006] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.00009S ms in cache
2018-09-11 22:28:28.675  INFO 7 --- [dProcessor-0009] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.00013S ms in cache
2018-09-11 22:28:28.678  INFO 7 --- [dProcessor-0007] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.000075S ms in cache
2018-09-11 22:28:28.680  INFO 7 --- [dProcessor-0012] c.c.r.rdsloader.RdsProcessor             : Processing record that was PT0S ( 0 ms) behind latest and spent PT0.001022S ms in cache
2018-09-11 22:28:28.809  INFO 7 --- [dProcessor-0011] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000377 at sequenceNumber 49588160182750452980545263886462202483121859579256772498, subSequenceNumber 1
2018-09-11 22:28:28.809  INFO 7 --- [dProcessor-0010] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000384 at sequenceNumber 49588160198249470893524046979762061216566254160974977026, subSequenceNumber 1
2018-09-11 22:28:28.811  INFO 7 --- [dProcessor-0001] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000436 at sequenceNumber 49588160294677893131970461433826345716085141884089932610, subSequenceNumber 1
2018-09-11 22:28:28.811  INFO 7 --- [dProcessor-0004] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000427 at sequenceNumber 49588160275566154496829720989890449304599334072021424818, subSequenceNumber 1
    ... 1 common frames omitted
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:545) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1221) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1177) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1407) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:451) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:505) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:670) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.HandlerPublisher.handlerRemoved(HandlerPublisher.java:395) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.complete(HandlerPublisher.java:408) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:255) ~[netty-nio-client-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:64) ~[kinesis-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:104) ~[kinesis-2.0.1.jar!/:na]
Caused by: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$complete$0(MakeAsyncHttpRequestStage.java:200) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: c194b77c-af7f-6d69-9586-f4b7597e980e)
2018-09-11 22:28:28.815  WARN 7 --- [      Thread-22] s.a.kinesis.lifecycle.ShardConsumer      : shardId-000000000488: Failure occurred in retrieval.  Restarting data requests
2018-09-11 22:28:28.816 DEBUG 7 --- [dProcessor-0008] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000488: [SubscriptionLifetime]: (FanOutRecordsPublisher#subscribeToShard) @ 2018-09-11T22:28:28.816030Z id: shardId-000000000488-2 -- Starting subscribe to shard
2018-09-11 22:28:28.816  INFO 7 --- [dProcessor-0014] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000507 at sequenceNumber 49588160429708905309073384494655891236109954956205760434, subSequenceNumber 0
2018-09-11 22:28:28.817  INFO 7 --- [dProcessor-0009] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000374 at sequenceNumber 49588160179829055359537752244981235300533442529063868258, subSequenceNumber 1
2018-09-11 22:28:28.826  INFO 7 --- [dProcessor-0012] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000498 at sequenceNumber 49588160412024414366638603992952345118213116918692323106, subSequenceNumber 1
2018-09-11 22:28:28.837  INFO 7 --- [dProcessor-0002] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000497 at sequenceNumber 49588160410574865928734109925154736845483584532875583250, subSequenceNumber 1
2018-09-11 22:28:28.837  INFO 7 --- [dProcessor-0007] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000363 at sequenceNumber 49588160153179664847293657824269075993370095812807235250, subSequenceNumber 1
2018-09-11 22:28:28.848  INFO 7 --- [dProcessor-0013] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000490 at sequenceNumber 49588160394719036092578829266126988628492334706561392290, subSequenceNumber 1
2018-09-11 22:28:28.848  INFO 7 --- [dProcessor-0003] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000401 at sequenceNumber 49588160223248606261076875455806734688981848970781268242, subSequenceNumber 1
2018-09-11 22:28:28.851  INFO 7 --- [dProcessor-0006] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000450 at sequenceNumber 49588160317469254724868758295577415592252907054528732194, subSequenceNumber 0
2018-09-11 22:28:28.868  INFO 7 --- [dProcessor-0000] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000492 at sequenceNumber 49588160406181619124623577364989381944926527427488784066, subSequenceNumber 1
2018-09-11 22:28:28.885 DEBUG 7 --- [nc-response-1-0] .k.r.f.FanOutRecordsPublisher$RecordFlow : shardId-000000000488: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ 2018-09-11T22:28:28.816030Z id: shardId-000000000488-2 -- java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: d14e06d2-9d4e-623e-855c-451efe8bd3b9)
    ... 1 common frames omitted
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.HandlerPublisher$ChannelSubscription$1.run(HandlerPublisher.java:452) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.access$200(HandlerPublisher.java:41) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.receivedDemand(HandlerPublisher.java:258) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.flushBuffer(HandlerPublisher.java:304) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at com.typesafe.netty.HandlerPublisher.publishMessage(HandlerPublisher.java:362) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:255) ~[netty-nio-client-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:64) ~[kinesis-2.0.1.jar!/:na]
    at software.amazon.awssdk.services.kinesis.model.InvalidArgumentException$BuilderImpl.build(InvalidArgumentException.java:104) ~[kinesis-2.0.1.jar!/:na]
Caused by: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: d14e06d2-9d4e-623e-855c-451efe8bd3b9)
    at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$complete$0(MakeAsyncHttpRequestStage.java:200) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: d14e06d2-9d4e-623e-855c-451efe8bd3b9)
2018-09-11 22:28:28.886  WARN 7 --- [nc-response-1-0] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000488: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ 2018-09-11T22:28:28.816030Z id: shardId-000000000488-2 -- CompletionException/software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: d14e06d2-9d4e-623e-855c-451efe8bd3b9)
2018-09-11 22:28:28.886 DEBUG 7 --- [nc-response-1-0] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000488: availableQueueSpace zeroing from 8```
pfifer commented 6 years ago

The value stored in the lease table is a sequence number for shard 438, and not for 488. We would need to find the last successful checkpoint that stored the sequence number to figure out who stored it.

Do you run the checkpoint on the processing thread, or do you execute the checkpoint on another thread?

I'm looking to see if we can add some additional validation to checkpointing.

pfifer commented 6 years ago

If you can check is that the sequence number in the DynamoDB table. If it isn't it makes me think the state lease state in the KCL is getting mixed up.

ryangardner commented 6 years ago

I'm not explicitly calling it on another thread, inside my ShardRecordProcessor in my

public void processRecords(ProcessRecordsInput processRecordsInput) { method I have it call a method after each userRecord is processed that is based largely on the way checkpointing is done in this sample: https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSampleRecordProcessor.java )

it calls:

   checkpointIfNecessary(processRecordsInput.checkpointer(), record);

passing in the checkpointer from the supplied ProcessRecordsInput

into this method:

    private void checkpointIfNecessary(RecordProcessorCheckpointer checkpointer, KinesisClientRecord record) {
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer, record);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }
    /**
     * Checkpoint with retries.
     *
     * @param checkpointer
     */
    private void checkpoint(RecordProcessorCheckpointer checkpointer, KinesisClientRecord record) {
        log.info("Checkpointing shard {} at sequenceNumber {}, subSequenceNumber {}", kinesisShardId, record.sequenceNumber(), record.subSequenceNumber());
        for (int i = 0; i < NUM_RETRIES; i++) {
            try {
                checkpointer.checkpoint(record.sequenceNumber(), record.subSequenceNumber());
                break;
            } catch (ShutdownException se) {
                // Ignore checkpoint if the processor instance has been shutdown (fail over).
                log.info("Caught shutdown exception, skipping checkpoint.", se);
                break;
            } catch (ThrottlingException e) {
                // Backoff and re-attempt checkpoint upon transient failures
                if (i >= (NUM_RETRIES - 1)) {
                    log.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
                    break;
                } else {
                    log.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
                            + NUM_RETRIES, e);
                }
            } catch (InvalidStateException e) {
                // This usually indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
                break;
            }
            try {
                Thread.sleep(BACKOFF_TIME_IN_MILLIS);
            } catch (InterruptedException e) {
                log.debug("Interrupted sleep", e);
            }
        }
    }

The biggest difference between the checkpointing that I have there and the sample is I'm explicitly passing in a sequence number to the checkpointer - but the checkpointer I'm using came from ProcessRecordsInput and the sequence / subsequence I'm checkpointing with also came from a record out of the list of KinesisClientRecords in processRecordsInput.records()

The code also does checkpoints at the shardEnd / shutdownRequested points:

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            log.info("Shard ended");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Error checkpointing after shard ended input", e);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Error checkpointing after shutdown was requested", e);
        }
    }
ryangardner commented 6 years ago

In dynamo for that shardId-000000000488 I have:

{
  "checkpoint": {
    "S": "49588160296506554238249968084872546083315933702822828898"
  },
  "checkpointSubSequenceNumber": {
    "N": "0"
  },
  "leaseCounter": {
    "N": "1637"
  },
  "leaseKey": {
    "S": "shardId-000000000488"
  },
  "leaseOwner": {
    "S": "fa7c2ba8-8eca-4a6b-b7fc-d132afa3e4cc"
  },
  "ownerSwitchesSinceCheckpoint": {
    "N": "5"
  },
  "parentShardId": {
    "SS": [
      "shardId-000000000330"
    ]
  }
}

which matches what was in that line in the logs earlier today:

Initializing ShardId ShardInfo(shardId=shardId-000000000488, concurrencyToken=f1dc693a-be84-4252-848e-0d083d3646a9, parentShardIds=[shardId-000000000330], checkpoint={SequenceNumber: 49588160296506554238249968084872546083315933702822828898,SubsequenceNumber: 0})

(nothing has checkpointed it to a newer value even though the app is still running / processing things)

pfifer commented 6 years ago

That sequence number is for shard 438. That means it did checkpoint on the wrong sequence number. I'm not sure how that happened at this time. I'm working on a change that will validate sequence numbers before allowing a checkpoint to occur.

If you happened to log the sequence number when you checkpoint that would help to track this down. Concurrently I'm looking to add a validator that should prevent/detect checkpointing with a sequence number that isn't part of the shard.

ryangardner commented 6 years ago

I do have logs when it was checkpointing - first showing it checkpointing 488 with that sequence number and then showing it checkpointing 438 with the same sequence number:

2018-09-11 13:35:30.315  INFO 7 --- [dProcessor-0065] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000488 at sequenceNumber 49588160296506554238249968084872546083315933702822828898, subSequenceNumber 0

then later:

2018-09-11 13:36:02.675  WARN 7 --- [dProcessor-0065] s.a.kinesis.lifecycle.ShardConsumer      : shardId-000000000488: onError().  Cancelling subscription, and marking self as failed.
software.amazon.awssdk.services.kinesis.model.ResourceInUseException: Another active subscription exists for this consumer: 120985645295:dms-l1-ingest-nonprod:shardId-000000000438:kinesis-rds-loader (Service: kinesis, Status Code: 500, Request ID: d1392888-ea8c-6ab6-8528-ee577bd34331)

followed by:

2018-09-11 13:36:02.693  INFO 7 --- [dProcessor-0065] c.c.r.rdsloader.RdsProcessor             : Checkpointing shard shardId-000000000438 at sequenceNumber 49588160296506554238249968084872546083315933702822828898, subSequenceNumber 0

It seems that's the same thread (shardProcessor-0065) but with different shardIds. At first this seemed odd to me, but looking at the logs that thread seems to use several different ShardProcessors - and checkpoints other shards like:

2018-09-11 13:35:44.664 INFO 7 --- [dProcessor-0065] c.c.r.rdsloader.RdsProcessor : Checkpointing shard shardId-000000000352 at sequenceNumber 49588160132952888952226382505947315896701337430961165826, subSequenceNumber 0

athielen2 commented 6 years ago

FWIW, we recreated our stack last night and restarted the KCL 2.0 application for the stream. Everything was working as expected until we performed another deployment and the application was restarted again. Shortly after restart, we saw warnings and errors similar to what @ryangardner is seeing:

018-09-12T19:59:10,331 [WARN] software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher:128 - shardId-000000000006: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ 2018-09-12T19:58:34.998Z id: shardId-000000000006-3 -- software.amazon.awssdk.services.kinesis.model.ResourceInUseException: Another active subscription exists for this consumer: 308120811573:measnap-MeasurementSnapshotCaptorStreamV2:shardId-000000000007:measnap-MeasurementSnapshotCaptorV2 (Service: kinesis, Status Code: 500, Request ID: f67b65de-03a9-760e-a268-17f5af2738f0)
software.amazon.awssdk.services.kinesis.model.ResourceInUseException: Another active subscription exists for this consumer: 308120811573:measnap-MeasurementSnapshotCaptorStreamV2:shardId-000000000007:measnap-MeasurementSnapshotCaptorV2 (Service: kinesis, Status Code: 500, Request ID: f67b65de-03a9-760e-a268-17f5af2738f0) 
2018-09-12T19:59:10,334 [WARN] software.amazon.kinesis.lifecycle.ShardConsumer$InternalSubscriber:165 - shardId-000000000006: onError(). Cancelling subscription, and marking self as failed.
software.amazon.awssdk.services.kinesis.model.ResourceInUseException: Another active subscription exists for this consumer: 308120811573:measnap-MeasurementSnapshotCaptorStreamV2:shardId-000000000007:measnap-MeasurementSnapshotCaptorV2 (Service: kinesis, Status Code: 500, Request ID: f67b65de-03a9-760e-a268-17f5af2738f0)
2018-09-12T19:59:11,213 [WARN] software.amazon.kinesis.lifecycle.ShardConsumer:244 - shardId-000000000006: Failure occurred in retrieval. Restarting data requests
software.amazon.awssdk.services.kinesis.model.ResourceInUseException: Another active subscription exists for this consumer: 308120811573:measnap-MeasurementSnapshotCaptorStreamV2:shardId-000000000007:measnap-MeasurementSnapshotCaptorV2 (Service: kinesis, Status Code: 500, Request ID: f67b65de-03a9-760e-a268-17f5af2738f0)
2018-09-12T19:59:12,214 [WARN] software.amazon.kinesis.lifecycle.ShardConsumer:244 - shardId-000000000006: Failure occurred in retrieval. Restarting data requests
java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000007, while it was used in a call to a shard with shardId-000000000006 (Service: Kinesis, Status Code: 400, Request ID: f145195f-a870-67e7-a556-6b10d7b51775)
ryangardner commented 6 years ago

I started to dive into this a bit to see if I could find anything that looked like it might be involved. In looking at the code and reading your past comments, it seems like the problem starts when something checkpoints a sequence to the wrong shard.

In my logs the most odd thing is seeing that sequence ID that apparently encodes something for shardId 438 saving to shardId 488 and then later saving it to 438 - I was trying to figure out how that would be possible for the shard processor to get the wrong sequence to checkpoint with (since it is grabbing it out of the record that it is processing - and the shardInfo that it has for the processor is set up when the initialize is called)

I've been going over the code trying to understand how it works to see if there are any odd synchronization issues I could find and so far haven't found anything.

2018-09-11 13:36:58.271  WARN 7 --- [      Thread-22] s.a.kinesis.lifecycle.ShardConsumer      : shardId-000000000488: Failure occurred in retrieval.  Restarting data requests java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.InvalidArgumentException: Invalid StartingSequenceNumber. It encodes shardId-000000000438, while it was used in a call to a shard with shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: e4dbcd23-cdb0-d50c-b0ca-0a1a37765948)

and then right after it gets a Rate exceeded message:

2018-09-11 13:36:58.318  WARN 7 --- [c-response-1-16] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000488: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ 2018-09-11T13:36:58.271390Z id: shardId-000000000488-58 -- CompletionException/software.amazon.awssdk.services.kinesis.model.LimitExceededException: Rate exceeded for consumer arn:aws:kinesis:us-east-1:120985645295:stream/dms-l1-ingest-nonprod/consumer/kinesis-rds-loader:1533759671 and shard shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: da4d0058-eee8-33b0-8e5c-c762142ebff4) java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.LimitExceededException: Rate exceeded for consumer arn:aws:kinesis:us-east-1:120985645295:stream/dms-l1-ingest-nonprod/consumer/kinesis-rds-loader:1533759671 and shard shardId-000000000488 (Service: Kinesis, Status Code: 400, Request ID: da4d0058-eee8-33b0-8e5c-c762142ebff4)

but both of these are about a minute after it has checkpointed it with the wrong value so they are

And later on before it starts to log the error messages at a high volume, it logs:

2018-09-11 13:48:12.731 2 ERROR 7 --- [      Thread-22] s.a.kinesis.lifecycle.ShardConsumer      : shardId-000000000488: Last request was dispatched at 2018-09-11T13:35:58.711581Z, but no response as of 2018-09-11T13:48:12.731138Z (PT12M14.019557S).  Cancelling subscription, and restarting

After it restarts it it starts to log that error message.

The only message in my logs close to the 13:35:58 timestamp it says it had the request dispatched was the 13:35:30 one where it was checkpointing that 488 with the wrong sequenceId. It seems that after it did that bad checkpoint, that one shard didn't get anything more back.

Other things that might be worth noting - around this same time I see that other shards are getting 500 errors back from kinesis and restarting:

One with a duplicate handler name error:

2018-09-11 13:30:33.203 ERROR 7 --- [tyEventLoop-0-2] s.a.a.h.n.n.internal.ResponseHandler     : Exception processing request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=https, host=kinesis.us-east-1.amazonaws.com, encodedPath=/, headers=[amz-sdk-invocation-id, amz-sdk-retry, Authorization, Content-Length, Content-Type, Host, User-Agent, X-Amz-Date, X-Amz-Security-Token, X-Amz-Target], queryParameters=[]) java.lang.IllegalArgumentException: Duplicate handler name: HttpStreamsClientHandler#0-body-publisher

and another one that looks like it's just a 500 error:

2018-09-11 13:30:33.138  WARN 7 --- [      Thread-22] s.a.kinesis.lifecycle.ShardConsumer      : shardId-000000000493: Failure occurred in retrieval.  Restarting data requests software.amazon.awssdk.services.kinesis.model.InternalFailureException: Internal Service Error (Service: kinesis, Status Code: 500, Request ID: ed8e5d30-10ad-5fc4-b99f-9883ea3eab95)

Is there anything else I can do to help pinpoint the problem with this?

pfifer commented 6 years ago

I'm working on a change that will validate each sequence number in the response from the FanOutRecordsProducer. I'm just starting testing, and will be looking to do a SNAPSHOT release soon. I want to get it out today or tonight to allow for some testing.

kcolberg commented 6 years ago

Hey Justin - read the exchanges between you and Ryan, and I was wondering if there was any progress. I work with Ryan and I need to get a status to our product folks on the issue we're seeing as it's impacting the timing on an alpha release. Thanks - Kevin

pfifer commented 6 years ago

This issue is currently our top priority. We have had some difficulty in recreating this issue. We have had several other customers using the 2.0 KCL who have not experienced this issue so we are looking at might be unique or different with your setup.

We are currently working on two parallel activities to solve the problem. First, we are adding additional debug information and checks which will be available Monday. This additional debug information should provide a detect when this issue first occurs provide more insight into the events leading up to it. Second, we are continuing to try and recreate the issues internally. We are also working with the AWS SDK team to assist in investigating from a different angle.

We will continue to update the issue as our investigation proceeds. We're also happy for any insights or questions you might have to help us track down the issue.

pfifer commented 6 years ago

A couple of questions for everyone currently impacted:

  1. Are you running on EC2, and if yes what region?
  2. If you're using EC2 what instance type are you using?
  3. What is the output from mvn dependency:tree (this just needs the KCL, AWS SDK and netty dependencies)?
  4. Anything you might think we should include in our testing

Thanks

ryangardner commented 6 years ago
  1. We're running on an ECS cluster on top of EC2 instances in us-east-1
  2. The EC2 instances that make up the ECS cluster are m4.2xlarge
  3. Here's what I have in the dependency tree:
[INFO] |  +- software.amazon.awssdk:kinesis:jar:2.0.1:compile
[INFO] |  |  +- software.amazon.awssdk:sdk-core:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  +- software.amazon.awssdk:auth:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  |  \- software.amazon.awssdk:profiles:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  +- software.amazon.awssdk:http-client-spi:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  +- software.amazon.awssdk:regions:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  |  \- com.fasterxml.jackson.jr:jackson-jr-objects:jar:2.9.5:compile
[INFO] |  |  +- software.amazon.awssdk:annotations:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  +- software.amazon.awssdk:utils:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  +- software.amazon.awssdk:aws-core:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
[INFO] |  |  |  \- software.amazon:flow:jar:1.1:compile (version selected from constraint [1.1,1.1])
[INFO] |  |  \- software.amazon.awssdk:apache-client:jar:2.0.1:runtime (version selected from constraint [2.0.1,2.0.1])
[INFO] |  +- software.amazon.awssdk:dynamodb:jar:2.0.1:compile
[INFO] |  +- software.amazon.awssdk:cloudwatch:jar:2.0.1:compile
[INFO] |  +- software.amazon.awssdk:netty-nio-client:jar:2.0.1:compile
[INFO] |  |  +- io.netty:netty-codec-http:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-codec-http2:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-codec:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-transport:jar:4.1.23.Final:compile
[INFO] |  |  |  \- io.netty:netty-resolver:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-common:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-buffer:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-handler:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.23.Final:compile
[INFO] |  |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.23.Final:compile
[INFO] |  |  +- com.typesafe.netty:netty-reactive-streams-http:jar:2.0.0:compile
[INFO] |  |  |  \- com.typesafe.netty:netty-reactive-streams:jar:2.0.0:compile
[INFO] |  |  \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
[INFO] |  +- com.google.guava:guava:jar:18.0:compile
[INFO] |  +- com.google.protobuf:protobuf-java:jar:2.6.1:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.7:compile
[INFO] |  \- io.reactivex.rxjava2:rxjava:jar:2.1.12:compile
[INFO] +- com.amazonaws:aws-java-sdk-rds:jar:1.11.306:compile
[INFO] |  +- com.amazonaws:aws-java-sdk-core:jar:1.11.306:compile
[INFO] |  |  +- commons-logging:commons-logging:jar:1.1.3:compile
[INFO] |  |  +- software.amazon.ion:ion-java:jar:1.0.2:compile
[INFO] |  |  +- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.9.5:compile
[INFO] |  |  \- joda-time:joda-time:jar:2.9.9:compile
[INFO] |  \- com.amazonaws:jmespath-java:jar:1.11.306:compile
[INFO] +- vc.inreach.aws:aws-signing-request-interceptor:jar:0.0.20:compile
[INFO] |  +- commons-codec:commons-codec:jar:1.11:compile
[INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.4.9:compile
[INFO] |  \- org.apache.httpcomponents:httpclient:jar:4.5.5:compile

We have around 10-20 instances of one application using the KCL to consume off of these shards.

athielen2 commented 6 years ago
  1. Yes, we are running on EC2 in regions us-east-1 (availability zones a and c)
  2. c5.large
  3. Dependency tree output:
    [INFO] +- org.apache.commons:commons-collections4:jar:4.2:compile
    [INFO] +- org.apache.httpcomponents:httpclient:jar:4.5.6:compile
    [INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.4.10:compile
    [INFO] |  +- commons-logging:commons-logging:jar:1.2:compile
    [INFO] |  \- commons-codec:commons-codec:jar:1.10:compile
    [INFO] +- org.apache.logging.log4j:log4j-api:jar:2.11.1:compile
    [INFO] +- org.apache.logging.log4j:log4j-core:jar:2.11.1:compile
    [INFO] +- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.11.1:compile
    [INFO] |  \- org.slf4j:slf4j-api:jar:1.7.25:compile
    [INFO] +- software.amazon.kinesis:amazon-kinesis-client:jar:2.0.2:compile
    [INFO] |  +- software.amazon.awssdk:kinesis:jar:2.0.1:compile
    [INFO] |  |  +- software.amazon.awssdk:sdk-core:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  |  +- software.amazon.ion:ion-java:jar:1.2.0:compile
    [INFO] |  |  |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.6:compile
    [INFO] |  |  |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.6:compile
    [INFO] |  |  |  \- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.9.6:compile
    [INFO] |  |  +- software.amazon.awssdk:auth:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  |  \- software.amazon.awssdk:profiles:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  +- software.amazon.awssdk:http-client-spi:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  +- software.amazon.awssdk:regions:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.6:compile
    [INFO] |  |  |  \- com.fasterxml.jackson.jr:jackson-jr-objects:jar:2.9.6:compile
    [INFO] |  |  +- software.amazon.awssdk:annotations:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  +- software.amazon.awssdk:utils:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  +- software.amazon.awssdk:aws-core:jar:2.0.1:compile (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  |  |  \- software.amazon:flow:jar:1.1:compile (version selected from constraint [1.1,1.1])
    [INFO] |  |  \- software.amazon.awssdk:apache-client:jar:2.0.1:runtime (version selected from constraint [2.0.1,2.0.1])
    [INFO] |  +- software.amazon.awssdk:dynamodb:jar:2.0.1:compile
    [INFO] |  +- software.amazon.awssdk:cloudwatch:jar:2.0.1:compile
    [INFO] |  +- software.amazon.awssdk:netty-nio-client:jar:2.0.1:compile
    [INFO] |  |  +- io.netty:netty-codec-http:jar:4.1.26.Final:compile
    [INFO] |  |  +- io.netty:netty-codec-http2:jar:4.1.26.Final:compile
    [INFO] |  |  +- io.netty:netty-codec:jar:4.1.26.Final:compile
    [INFO] |  |  +- io.netty:netty-transport:jar:4.1.26.Final:compile
    [INFO] |  |  |  \- io.netty:netty-resolver:jar:4.1.26.Final:compile
    [INFO] |  |  +- io.netty:netty-common:jar:4.1.26.Final:compile
    [INFO] |  |  +- io.netty:netty-buffer:jar:4.1.26.Final:compile
    [INFO] |  |  +- io.netty:netty-handler:jar:4.1.26.Final:compile
    [INFO] |  |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.26.Final:compile
    [INFO] |  |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.26.Final:compile
    [INFO] |  |  +- com.typesafe.netty:netty-reactive-streams-http:jar:2.0.0:compile
    [INFO] |  |  |  \- com.typesafe.netty:netty-reactive-streams:jar:2.0.0:compile
    [INFO] |  |  \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
    [INFO] |  +- com.google.guava:guava:jar:18.0:compile
    [INFO] |  +- com.google.protobuf:protobuf-java:jar:2.6.1:compile
    [INFO] |  +- org.apache.commons:commons-lang3:jar:3.7:compile
    [INFO] |  \- io.reactivex.rxjava2:rxjava:jar:2.1.14:compile
    [INFO] +- com.google.code.gson:gson:jar:2.8.5:compile
    [INFO] +- joda-time:joda-time:jar:2.10:compile
    [INFO] +- org.junit.jupiter:junit-jupiter-engine:jar:5.2.0:test
    [INFO] |  +- org.apiguardian:apiguardian-api:jar:1.0.0:test
    [INFO] |  +- org.junit.platform:junit-platform-engine:jar:1.2.0:test
    [INFO] |  |  +- org.junit.platform:junit-platform-commons:jar:1.2.0:test
    [INFO] |  |  \- org.opentest4j:opentest4j:jar:1.1.0:test
    [INFO] |  \- org.junit.jupiter:junit-jupiter-api:jar:5.2.0:test
    [INFO] +- org.mockito:mockito-core:jar:2.22.0:test
    [INFO] |  +- net.bytebuddy:byte-buddy:jar:1.8.21:test
    [INFO] |  +- net.bytebuddy:byte-buddy-agent:jar:1.8.21:test
    [INFO] |  \- org.objenesis:objenesis:jar:2.6:test
    [INFO] \- commons-io:commons-io:jar:2.6:test
pfifer commented 6 years ago

Thanks for providing this. I'm going to continue trying to track down what is happening.

pfifer commented 6 years ago

Another question I forgot to ask:

ryangardner commented 6 years ago

Another question I forgot to ask:

  • What OS are you using on the instances?

We're based on ami: amzn-ami-2018.03.d-amazon-ecs-optimized (ami-112e366e) which has the description: Amazon Linux AMI 2018.03.d x86_64 ECS HVM GP2 we could update this to a slightly newer AMI if it would be helpful.

(Also, I double checked: on one of our accounts the EC2 instance type is m5.4xlarge and the other one is m4.4xlarge but both have the same issue)

ryangardner commented 6 years ago

If it would be helpful, we could open up a support ticket with our enterprise support people and work on a way to get you access to our actual EC2 instances to get at the docker containers / JVMs themselves to attach to them and analyze anything (I think that'd might be the easiest way to get things going to get you access to these systems?)

pfifer commented 6 years ago

Opening a case would be helpful as well.

pfifer commented 6 years ago

I've just pushed a PR (#400) that adds the sequence number validator that should detect sequence numbers that don't originate from the shard that the FanOutRecordsPublisher is processing. The PR should be merged later today, and I'll look at doing a snapshot release tonight.

You will need to enable the validation by creation a FanOutConfig, and setting validateRecordsAreForShard(true) on it. The config can then be assigned to the RetrievalConfig#retrievalSpecificConfig.

athielen2 commented 6 years ago

We use a "corporate" AMI, but it is an Ubuntu image (ubuntu-xenial-16.04-amd64-server)

pfifer commented 6 years ago

I won't be able to get the change published to snapshots tonight. I will look into it tomorrow.

pfifer commented 6 years ago

It appears likely that I will have to do a full release for the validation changes. I'm also waiting on a change for the SDK to enable logging of requestIds.

kcolberg commented 6 years ago

Thanks Justin - is there a timeline?

ryangardner commented 6 years ago

I built a snapshot of this and pushed it out. I see it logging the new debug messages but haven't seen it log the validation failure message yet. There are some pre-existing shards in a borked state. I will edit our lease table manually and set those shards to TRIM_HORIZON and see if that helps get things moving again.

As a temporary workaround, we're making our publisher put the same data into kinesis with three different shardIds (we can handle duplicate data in our application) so that the likelihood of any single piece of data getting stuck in three broken shards out of the 160 we have going is extremely low - I'll post any updates with logs if those validation-related error messages to appear.

pfifer commented 6 years ago

I'm currently testing for a full release. I'm going to move the validation to an experimental form, since I really don't want it to be there forever. My aim is to have the release in process by the end of today.

athielen2 commented 6 years ago

@pfifer, I see that the experimental build was released. I am assuming that we will need to delete our corrupted tables in order for this to start working again. Is that assumption correct?

ryangardner commented 6 years ago

After running it (built on the branch) for almost 24 hours now, I don't see any messages from the new logging being logged:

I don't see any Received records destined for different shards or A problem occurred while validating sequence numbers

I see plenty of errors like this with FanOutRecordsPublisher#errorOccurred:

2018-09-19 13:06:59.333  WARN 8 --- [-response-1-293] s.a.k.r.fanout.FanOutRecordsPublisher    : shardId-000000000353: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ 2018-09-19T13:06:59.321319Z id: shardId-000000000353-266 -- CompletionException/SdkClientException/java.lang.IllegalArgumentException: Duplicate handler name: HttpStreamsClientHandler#0-body-publisher
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[na:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$completeExceptionally$1(MakeAsyncHttpRequestStage.java:208) ~[sdk-core-2.0.1.jar!/:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]
Caused by: software.amazon.awssdk.core.exception.SdkClientException: null
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97) ~[sdk-core-2.0.1.jar!/:na]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:143) ~[sdk-core-2.0.1.jar!/:na]
    ... 9 common frames omitted
Caused by: java.lang.IllegalArgumentException: Duplicate handler name: HttpStreamsClientHandler#0-body-publisher
    at io.netty.channel.DefaultChannelPipeline.checkDuplicateName(DefaultChannelPipeline.java:1101) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.filterName(DefaultChannelPipeline.java:302) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.addAfter(DefaultChannelPipeline.java:319) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.addAfter(DefaultChannelPipeline.java:308) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:161) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148) ~[netty-reactive-streams-http-2.0.0.jar!/:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at com.typesafe.netty.HandlerPublisher.channelRead(HandlerPublisher.java:356) ~[netty-reactive-streams-2.0.0.jar!/:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1407) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1177) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1221) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) ~[netty-codec-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:545) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) ~[netty-transport-4.1.23.Final.jar!/:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar!/:4.1.23.Final]
    ... 1 common frames omitted

shard 353 is not one of the ones that is in the stuck state with the incorrect sequence / shard IDs - so the 500 error is probably not related to the same thing.

There isn't a lot of data flowing in right now, I'll trigger some more processing today to see if these shards that get 500 errors ever recover and receive data - I think in the past that they have

... What other log messages should I look for?

My application is set to use LATEST if there is no entry in the lease table - and as of now all the data is processed through that stream and I could completely delete all the entries in the dynamo state and start a process to push a large amount of data through it if it might help get to the bottom of this. I did see new shards pop up in the list of ones that the ShardConsumer complained about having a mismatch over the last 24 hours but none of those new error messages in the fanout publisher being able to validate it...

To double check, my configuration is:

       Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig()
                        .skipShardSyncAtWorkerInitializationIfLeasesExist(true),
                configsBuilder.leaseManagementConfig()
                        .maxLeaseRenewalThreads(80)
                        .failoverTimeMillis(TimeUnit.MINUTES.toMillis(3)),
                configsBuilder.lifecycleConfig()
                        // had to disable this because of: https://github.com/awslabs/amazon-kinesis-client/issues/381
                        //.logWarningForTaskAfterMillis(Optional.of(TimeUnit.SECONDS.toMillis(30))),
                ,
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(
                        // log when things are bad ( see https://github.com/awslabs/amazon-kinesis-client/issues/391 )
                        new FanOutConfig(configsBuilder.kinesisClient())
                                .validateRecordsAreForShard(true)
                            .streamName(configsBuilder.streamName())
                            .applicationName(configsBuilder.applicationName())

                )
pfifer commented 6 years ago

For the experimental version just released I moved the validation to a different configuration software.amazon.kinesis.retrieval.fanout.experimental.ExperimentalFanOutConfig. You should be able to replace

configsBuilder.retrievalConfig().retrievalSpecificConfig(
                        // log when things are bad ( see https://github.com/awslabs/amazon-kinesis-client/issues/391 )
                        new FanOutConfig(configsBuilder.kinesisClient())
                                .validateRecordsAreForShard(true)
                            .streamName(configsBuilder.streamName())
                            .applicationName(configsBuilder.applicationName())

with

configsBuilder.retrievalConfig().retrievalSpecificConfig(
                        // log when things are bad ( see https://github.com/awslabs/amazon-kinesis-client/issues/391 )
                        new ExperimentalFanOutConfig(configsBuilder.kinesisClient())
                            .streamName(configsBuilder.streamName())
                            .applicationName(configsBuilder.applicationName())

Additionally if possible can you add the SequenceNumberValidator to your record processor to try and catch cases where the records didn't originate from the FanOutRecordsPublisher. Here is the code I'm using to test with:

package software.amazon.kinesis.sample;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.checkpoint.SequenceNumberValidator;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ShardIdCheck {

    private static final Logger log = LoggerFactory.getLogger(ShardIdCheck.class);

    final SequenceNumberValidator validator = new SequenceNumberValidator();

    public Set<String> checkShardIds(String shardId, Stream<String> sequenceNumbers) {
        List<Optional<String>> possibleShardIds = sequenceNumbers.map(validator::shardIdFor).collect(Collectors.toList());
        long missingShardIds = possibleShardIds.stream().filter(o -> !o.isPresent()).count();
        Set<String> shardIds = possibleShardIds.stream().filter(Optional::isPresent).map(Optional::get)
                .collect(Collectors.toSet());

        if (missingShardIds > 0) {
            log.warn("Unable to extract shardIds from some {} records", missingShardIds);
        }
        if (shardIds.stream().anyMatch(s -> !StringUtils.equalsIgnoreCase(s, shardId))) {
            log.warn("Found the following mismatched shardIds: {}",
                    shardIds.stream().collect(Collectors.joining(", ")));
        }

        return shardIds;
    }

}

Returning the seen shard id's is done to allow my record processor to log all the shard id's its seen. At the same time it also logs the shardId it's processing.

pfifer commented 6 years ago

@athielen2 You don't need to delete the table, you can also remove all the leases and they will be recreated. The steps I usually follow when testing

  1. Stop the application entirely
  2. Delete the contents of the lease table
  3. Restart the application

At startup the application will see that there are no shards in the table and will retrieve the shard list from Kinesis and recreate the leases from the configured initial position. The primary reason I do this is to avoid waiting for the table to be created.