Open echo8 opened 6 years ago
The commit message that you posted has the fix for Polling configuration for the KCL. You seem to be using EnhancedFanout out if you are seeing SubscribeToShardEvent.MillisBehindLatest metric. Can you confirm the configuration that you are using? By default KCL 2.x uses Enhanced Fanout feature.
Yes we are using the Enhanced Fanout feature.
We are using KCL 2.0.5 and seeing a lot of these messages too, for example:
DateTime=2019-01-28 20:54:17,267 Application=myapp Thread=[kinesis-scheduler1] Logger=software.amazon.kinesis.lifecycle.ShardConsumer Type=ERROR Message=shardId-000000000001: Last request was dispatched at 2019-01-28T20:53:42.247Z, but no response as of 2019-01-28T20:54:17.267Z (PT35.02S). Cancelling subscription, and restarting.
The 35 second value seems to be hard-coded in ShardConsumer#MAX_TIME_BETWEEN_REQUEST_RESPONSE
, so we don't have control over it.
There are also warnings like this, but I don't know if the two are related:
DateTime=2019-01-28 21:27:40,571 Application=myapp Thread=[ShardRecordProcessor-0078] Logger=software.amazon.kinesis.lifecycle.ShardConsumer Type=WARN Message=shardId-000000000063: onError(). Cancelling subscription, and marking self as failed.
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:142)
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access$700(FanOutRecordsPublisher.java:51)
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:516)
at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard$1(DefaultKinesisAsyncClient.java:2102)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$completeExceptionally$1(MakeAsyncHttpRequestStage.java:208)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: software.amazon.awssdk.core.exception.SdkClientException
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:143)
... 9 more
Caused by: io.netty.handler.timeout.ReadTimeoutException
Could you try running the application with KCL 2.1.1?
I get a similar issue where I wait for the MAX_TIME_BETWEEN_REQUEST_RESPONSE to be hit. I have a single enhanced fanout shard using latest KCL (2.1.1)
Logs:
18:24:36.432 [KCL 2.0+ Scheduler Thread] WARN s.a.k.lifecycle.ShardConsumer - Previous PROCESS task still pending for shard shardId-000000000000 since PT35.87S ago.
18:24:37.433 [KCL 2.0+ Scheduler Thread] WARN s.a.k.lifecycle.ShardConsumer - Last time data arrived: 2019-02-13T18:24:00.562Z (PT36.87S)
...
many many repeated warnings about last time data arrived
...
then finally...
WARN s.a.k.r.f.FanOutRecordsPublisher - shardId-000000000000: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request(6), because subscribers don't match.
18:28:09.987 [ShardRecordProcessor-0003] WARN s.a.k.lifecycle.ShardConsumer - shardId-000000000000: onError(). Cancelling subscription, and marking self as failed.
Caused by: io.netty.handler.timeout.ReadTimeoutException: null
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:142)
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access$700(FanOutRecordsPublisher.java:51)
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:516)
at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard$1(DefaultKinesisAsyncClient.java:2238)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
... truncated stack trace...
It ends up being a 5-10 minute outage receiving enhanced fanout records. The kinesis stream never stopped having records put during this time. Despite all this, the 'millisBehind' metric for the enhanced app stream sits at 0 which just seems wrong to me.
Here are some examples of the metrics in cloudwatch for the enhanced fanout:
@ShibaBandit
How long did the call to processRecords
take? The warning you're getting is telling you that the call to processRecords
has taken more than the configured time in this case it looks like 35 seconds. The last time data arrived warning happens to be emitted at the same time. If your processRecords
was blocked for 5 to 10 minutes than the KCL will need to wait for your record processor to finish before getting more data from Kinesis. This is the break you see in the metrics, and the read timeout.
@pfifer I'll check the process records, could be on my end or down stream if that's the case.
KCL 2.1.1 seems like it might be better. I don't see anything related to MAX_TIME_BETWEEN_REQUEST_RESPONSE
now. The only warning messages appearing now are things like below (stack trace omitted):
Thread=[kinesis-scheduler1] Logger=software.amazon.kinesis.lifecycle.ShardConsumer Type=WARN Message=shardId-000000000126: Failure occurred in retrieval. Restarting data requests
java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ResourceInUseException: Another active subscription exists for this consumer: 111248436868:myconsumer:shardId-000000000126:myapp (Service: Kinesis, Status Code: 400, Request ID: f6794ca6-9a06-8e70-a29d-f92301fa46be)
Thread=[ws-java-sdk-NettyEventLoop-1-5] Logger=software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher Type=WARN Message=shardId-000000000135: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ 2019-02-18T13:43:02.957Z id: shardId-000000000135-96 -- software.amazon.awssdk.services.kinesis.model.InternalFailureException: Internal Service Error (Service: kinesis, Status Code: 500, Request ID: ee582445-b91b-8bb6-babc-9ce3dc2153f6)
software.amazon.awssdk.services.kinesis.model.InternalFailureException: Internal Service Error (Service: kinesis, Status Code: 500, Request ID: ee582445-b91b-8bb6-babc-9ce3dc2153f6)
Also got one of these:
DateTime=2019-02-18T13:25:29,161 Thread=[kinesis-scheduler1] Logger=software.amazon.awssdk.http.nio.netty.internal.ResponseHandler Type=ERROR Message=Could not release channel back to the pool
java.util.NoSuchElementException: io.netty.handler.timeout.ReadTimeoutHandler
at io.netty.channel.DefaultChannelPipeline.getContextOrDie(DefaultChannelPipeline.java:1137) ~[netty-transport-4.1.32.Final.jar!/:4.1.32.Final]
at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:457) ~[netty-transport-4.1.32.Final.jar!/:4.1.32.Final]
at software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists(ChannelUtils.java:36) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool.removePerRequestHandlers(HandlerRemovingChannelPool.java:73) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool.release(HandlerRemovingChannelPool.java:56) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ReleaseOnceChannelPool.release(ReleaseOnceChannelPool.java:67) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.closeAndRelease(ResponseHandler.java:153) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$1000(ResponseHandler.java:64) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.lambda$onCancel$0(ResponseHandler.java:231) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:164) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$700(ResponseHandler.java:64) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onCancel(ResponseHandler.java:230) ~[netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$OnCancelSubscription.cancel(ResponseHandler.java:298) [netty-nio-client-2.4.0.jar!/:?]
at software.amazon.awssdk.awscore.eventstream.EventStreamAsyncResponseTransformer$EventPublisher$1.cancel(EventStreamAsyncResponseTransformer.java:428) [aws-core-2.4.0.jar!/:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordSubscription.cancel(FanOutRecordsPublisher.java:608) [amazon-kinesis-client-2.1.1.jar!/:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.cancel(FanOutRecordsPublisher.java:560) [amazon-kinesis-client-2.1.1.jar!/:?]
at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$1.cancel(FanOutRecordsPublisher.java:397) [amazon-kinesis-client-2.1.1.jar!/:?]
at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189) [rxjava-2.1.14.jar!/:?]
at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.cancel(FlowableSubscribeOn.java:141) [rxjava-2.1.14.jar!/:?]
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.cancel(FlowableObserveOn.java:154) [rxjava-2.1.14.jar!/:?]
at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189) [rxjava-2.1.14.jar!/:?]
at io.reactivex.internal.subscribers.StrictSubscriber.cancel(StrictSubscriber.java:77) [rxjava-2.1.14.jar!/:?]
at software.amazon.kinesis.lifecycle.ShardConsumer$InternalSubscriber.cancel(ShardConsumer.java:183) [amazon-kinesis-client-2.1.1.jar!/:?]
at software.amazon.kinesis.lifecycle.ShardConsumer.leaseLost(ShardConsumer.java:486) [amazon-kinesis-client-2.1.1.jar!/:?]
at software.amazon.kinesis.coordinator.Scheduler.cleanupShardConsumers(Scheduler.java:605) [amazon-kinesis-client-2.1.1.jar!/:?]
at software.amazon.kinesis.coordinator.Scheduler.runProcessLoop(Scheduler.java:306) [amazon-kinesis-client-2.1.1.jar!/:?]
at software.amazon.kinesis.coordinator.Scheduler.run(Scheduler.java:222) [amazon-kinesis-client-2.1.1.jar!/:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_162]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]
I'm talking with the SDK team to investigate the NoSuchElementException.
The other two exceptions can occur, but should be automatically retried. We're considering changing how some of the exceptions are reported and retried.
We're also seeing "shardId-000000000000: Last request was dispatched at 2019-02-26T01:27:11.408135Z, but no response as of 2019-02-26T01:28:06.097911Z (PT54.689776S). Cancelling subscription, and restarting." and then our consumer is "dead in the water" and does not consume messages until we restart it.
We're using KCL version 2.1.1. This specific system is one of our development setups and is idle during night time. Come morning, no new messages are consumed and we have to restart our service.
We're talking about a minuscule throughput - and not using fan out.
This is how the metrics look just before the record "jam" is unblocked.
We are seeing same issue in our spark and java consumers. Our KCL is 2.1.3.
Sometimes, the ShardConsumer takes longer than 35sec (hardcoded in MAX_TIME_BETWEEN_REQUEST_RESPONSE) to process records as it is based on custom business logic.
So, is there anyway to configure/change MAX_TIME_BETWEEN_REQUEST_RESPONSE in KCL library?
@pfifer As mentioned by yotamshtosselfg, we are also experiencing same exception " software.amazon.kinesis.lifecycle.ShardConsumer - shardId-000000000003: Last request was dispatched at 2019-07-29T06:37:19.918Z, but no response as of 2019-07-29T06:37:54.921Z (PT35.003S). Cancelling subscription, and restarting. " but in our case the consumer is still able to consume events and no event loss has happened yet, also we didnt had to restart our application.
We're using KCL version 2.0.4 with fan out enabled and we saw this happening 2-3 times on production but each time the throughput was very low.
Can this cause any loss of events by KCL or we can ignore this
@pfifer Were you able to get any info on the NoSuchElementException?
What to expect from KCL release 2.2.2?
Case 1: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you replaced or
modified thread/queue configurations of SchedulerThreadPoolExecutor and faced situations
like data loss or shard getting struck or intermittent pause in data consumpton, please
continue reading, else go to next case. These issues are potentially due to undelivered
executor service events.
a. This release ensures data loss does not happen, by preventing subsequent events
from getting delivered upon an event delivery failure and later restarting the subscription.
This may, however, lead to intermittent pause in data consumption, as KCL checks for
exceptionally completed subscriptions every 35 secponds. Refer section d for mitigation.
This scenario is identified by the log messages,
- ERROR s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000035: Received unexpected ack for the active subscription shardId-000000000077-005. Throwing.
- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S). Cancelling subscription, and restarting.
b. This release prevents the shard stuck situations that happened due to undelivered
control plane messages. [NOTE : Control plane messages are responsible for fetching
further events from KDS service. This includes subscribeToShard() API call and reactive
layer's request(N) calls]
c. This release, have improved logging that will capture undelivered control plane
messages for troubleshooting. Any undelivered control plane message might still lead
to temporary pause in data consumption. Refer section d for mitigation. This is identified
by the log message,
- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S). Cancelling subscription, and restarting.
d. Mitigation for undelivered messages: The undelivered messages are primarily due to
reduced SchedulerThreadPoolExecutor capacity. The customers are expected to assess
the state of the SchedulerThreadPoolExecutor using the following diagnostic log messages
and take appropriate actions like reducing the RecordProcessor.processRecords' Time or
scaling out the application or increase the number of threads in the executor.
i. ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads and
queueSize are consistently high.
- INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=100, coreThreads=0, leasesOwned=40, largestPoolSize=2, maximumPoolSize=2147483647)
ii. RejectedTaskEvent ERROR log emitted when SchedulerThreadPoolExecutor fails to
execute any event
- ERROR s.a.k.c.DiagnosticEventLogger [NONE] - Review your thread configuration to prevent task rejections. Task rejections will slow down your application and some shards may stop processing. Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=0, maximumPoolSize=2147483647) - io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Test exception.
Case 2: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you did NOT modify
or replace SchedulerThreadPoolExecutor and faced shard stuck situations or frequent
intermittent pause in data consumption, please continue reading, else go to next case.
These issues are potentially due to submitting more tasks to SchedulerThreadPoolExecutor,
than it can handle, leading to delayed execution of submitted tasks.
a. The common symptom of this situation is the following log message,
- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000077: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S). Cancelling subscription, and restarting.
b. This release has more diagnostic log messages to identify the issues around the
congested SchedulerThreadPoolExecutor
i. FanOutRecordsPublisher WARN log indicating high time (over 11 seconds) taken
by SchedulerThreadPoolExecutor to deliver an event to ShardConsumer.
Ideally delivery time should be less than a second. If this is consistently
high, refer section c.
- WARN s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is high at 14534 millis. Check the ExecutorStateEvent logs to see the state of the executor service. Also check if the RecordProcessor's processing time is high.
ii. FanOutRecordsPublisher DEBUG log to check the current event delivery time
by SchedulerThreadPoolExecutor. Ideally this should be less than a second.
If this is consistently high, refer section c.
- DEBUG s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is at 401 millis
iii.ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads
is consistently high. activeThreads is considered high if it is more than 2X
the number of worker leases. If this is consistently high, refer section c.
- INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=450, coreThreads=0, leasesOwned=64, largestPoolSize=520, maximumPoolSize=2147483647)
c. The customers are expected to assess the state of the SchedulerThreadPoolExecutor using
the above diagnostic log messages and take appropriate mitigations like reducing the
RecordProcessor.processRecords' Time or scaling out the application.
Case 3. All customers of KCL 2.x, prior to 2.2.2 release, were in a blind spot to throttling or
any other exception from Cloudwatch metrics publish calls. Now these exceptions are made visible
and we expect customers to take appropriate actions like increasing the Cloudwatch Put API TPS
to fix the throttling issue or increasing the concurrent connections of the cloudwatch client
to fix the limited connections issue.
a. Increasing the concurrency of client - CloudWatchAsyncClient.builder().region(region).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)).build();
b. Cloudwatch limit increase - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html
Thanks @ashwing. Not sure if this is the right place, but we're running amazon-kinesis-client-python in a container on a Fargate cluster. We received all of the errors from Case 2 and it sounds like we need to scale out. However, my understanding is that threads get created per shard, so adding additional tasks would be incorrect (and possibly do nothing). So, to scale out is that just adding additional memory and cpu in the task? Thanks
*Edit... am interesting note, the container that's getting the error is a RedHat atomic container (made by the infrastructure team). I tried to replicate it in my ubuntu:18.04 container and cannot get it to replicate.
Any update on this? Decreasing batch size / max records is not possible with enhanced fan-out. We need this a well and I see it in several issues.
Took some time trying to figure this out in my app as well. Turns out that reducing how often checkpointing was done resolved my issue. I was doing it after each batch, after reducing it to about once a minute cleaned up my errors and had it run more smoothly.
We are in the process of upgrading several consumers to KCL 2.0. They are attached to a somewhat large stream (thousands of shards) and have been running on KCL 1.x for a long time without issue.
Today we ran into the following exception which caused some shards not to be consumed:
We've seen this exception before right after deploys, but they usually disappear within 30 minutes. Today it started happening during normal operation and it caused some shards not to be consumed at all. The max statistic of the SubscribeToShardEvent.MillisBehindLatest metric just kept increasing.
We are running on the latest commit (f52f2559ed) of the master branch. Any idea what could be happening?
Edit: should probably also mention that we let it run like this for over 2 hours and it never recovered. We've had to revert everything back to the old KCL.