awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
643 stars 466 forks source link

KCL 2.2.x How to decrease/override the batch size fetched by each shard processor #569

Open abazeem opened 5 years ago

abazeem commented 5 years ago

I am running a Kinesis Consumer application with EnhancedFanOut on KCL 2.2.x I keep getting the below ReadTimeOut Exception : software.amazon.kinesis.lifecycle.ShardConsumerSubscriber: shardId-000000000039: onError(). Cancelling subscription, an d marking self as failed. KCL will recreate the subscription as neccessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittant ReadTimeout warnin gs. This is probably because my processing is taking more than 35 sec ( which is the default timeout). I would like to either increase the timeout or reduce the size of the batch so that my processing is completed without getting timed out.

Default batchSize is 10000. My processing is failing when the batch exceeds more than 8000.

ashwing commented 5 years ago
What to expect from KCL release 2.2.2?

Case 1: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you replaced or 
modified thread/queue configurations of SchedulerThreadPoolExecutor and faced situations 
like data loss or shard getting struck or intermittent pause in data consumpton, please 
continue reading, else go to next case. These issues are potentially due to undelivered 
executor service events.
    a.  This release ensures data loss does not happen, by preventing subsequent events 
        from getting delivered upon an event delivery failure and later restarting the subscription. 
        This may, however, lead to intermittent pause in data consumption, as KCL checks for 
        exceptionally completed subscriptions every 35 secponds. Refer section d for mitigation. 
        This scenario is identified by the log messages,
            - ERROR s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000035: Received unexpected ack for the active subscription shardId-000000000077-005. Throwing.
            - ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
    b.  This release prevents the shard stuck situations that happened due to undelivered 
        control plane messages. [NOTE : Control plane messages are responsible for fetching 
        further events from KDS service. This includes subscribeToShard() API call and reactive 
        layer's request(N) calls]
    c.  This release, have improved logging that will capture undelivered control plane 
        messages for troubleshooting. Any undelivered control plane message might still lead 
        to temporary pause in data consumption. Refer section d for mitigation. This is identified 
        by the log message,
            - ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
    d.  Mitigation for undelivered messages: The undelivered messages are primarily due to 
        reduced SchedulerThreadPoolExecutor capacity. The customers are expected to assess 
        the state of the SchedulerThreadPoolExecutor using the following diagnostic log messages 
        and take appropriate actions like reducing the RecordProcessor.processRecords' Time or 
        scaling out the application or increase the number of threads in the executor.
            i.  ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads and 
                queueSize are consistently high.
                    - INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=100, coreThreads=0, leasesOwned=40, largestPoolSize=2, maximumPoolSize=2147483647)
            ii. RejectedTaskEvent ERROR log emitted when SchedulerThreadPoolExecutor fails to 
                execute any event
                    - ERROR s.a.k.c.DiagnosticEventLogger [NONE] - Review your thread configuration to prevent task rejections. Task rejections will slow down your application and some shards may stop processing. Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=0, maximumPoolSize=2147483647) - io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Test exception.

Case 2: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you did NOT modify 
or replace SchedulerThreadPoolExecutor and faced shard stuck situations or frequent 
intermittent pause in data consumption, please continue reading, else go to next case. 
These issues are potentially due to submitting more tasks to SchedulerThreadPoolExecutor, 
than it can handle, leading to delayed execution of submitted tasks.
    a.  The common symptom of this situation is the following log message,
            - ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000077: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
    b.  This release has more diagnostic log messages to identify the issues around the 
        congested SchedulerThreadPoolExecutor
            i.  FanOutRecordsPublisher WARN log indicating high time (over 11 seconds) taken 
                by SchedulerThreadPoolExecutor to deliver an event to ShardConsumer. 
                Ideally delivery time should be less than a second. If this is consistently 
                high, refer section c.
                    - WARN  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is high at 14534 millis. Check the ExecutorStateEvent logs to see the state of the executor service. Also check if the RecordProcessor's processing time is high.
            ii. FanOutRecordsPublisher DEBUG log to check the current event delivery time 
                by SchedulerThreadPoolExecutor. Ideally this should be less than a second. 
                If this is consistently high, refer section c.
                    - DEBUG  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is at 401 millis
            iii.ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads 
                is consistently high. activeThreads is considered high if it is more than 2X 
                the number of worker leases. If this is consistently high, refer section c.
                    - INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=450, coreThreads=0, leasesOwned=64, largestPoolSize=520, maximumPoolSize=2147483647)
    c.  The customers are expected to assess the state of the SchedulerThreadPoolExecutor using 
        the above diagnostic log messages and take appropriate mitigations like reducing the 
        RecordProcessor.processRecords' Time or scaling out the application. 

Case 3. All customers of KCL 2.x, prior to 2.2.2 release, were in a blind spot to throttling or 
any other exception from Cloudwatch metrics publish calls. Now these exceptions are made visible 
and we expect customers to take appropriate actions like increasing the Cloudwatch Put API TPS 
to fix the throttling issue or increasing the concurrent connections of the cloudwatch client 
to fix the limited connections issue.
    a.  Increasing the concurrency of client - CloudWatchAsyncClient.builder().region(region).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)).build();
    b.  Cloudwatch limit increase - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html
ashwing commented 5 years ago

Also when using EnhancedFanOut, batchSize has no effect. batchSize works only with polling readers.

zmaupinns1 commented 3 years ago
What to expect from KCL release 2.2.2?

Case 1: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you replaced or 
modified thread/queue configurations of SchedulerThreadPoolExecutor and faced situations 
like data loss or shard getting struck or intermittent pause in data consumpton, please 
continue reading, else go to next case. These issues are potentially due to undelivered 
executor service events.
  a.  This release ensures data loss does not happen, by preventing subsequent events 
      from getting delivered upon an event delivery failure and later restarting the subscription. 
      This may, however, lead to intermittent pause in data consumption, as KCL checks for 
      exceptionally completed subscriptions every 35 secponds. Refer section d for mitigation. 
      This scenario is identified by the log messages,
          - ERROR s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000035: Received unexpected ack for the active subscription shardId-000000000077-005. Throwing.
          - ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
  b.  This release prevents the shard stuck situations that happened due to undelivered 
      control plane messages. [NOTE : Control plane messages are responsible for fetching 
      further events from KDS service. This includes subscribeToShard() API call and reactive 
      layer's request(N) calls]
  c.  This release, have improved logging that will capture undelivered control plane 
      messages for troubleshooting. Any undelivered control plane message might still lead 
      to temporary pause in data consumption. Refer section d for mitigation. This is identified 
      by the log message,
          - ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
  d.  Mitigation for undelivered messages: The undelivered messages are primarily due to 
      reduced SchedulerThreadPoolExecutor capacity. The customers are expected to assess 
      the state of the SchedulerThreadPoolExecutor using the following diagnostic log messages 
      and take appropriate actions like reducing the RecordProcessor.processRecords' Time or 
      scaling out the application or increase the number of threads in the executor.
          i.  ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads and 
              queueSize are consistently high.
                  - INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=100, coreThreads=0, leasesOwned=40, largestPoolSize=2, maximumPoolSize=2147483647)
          ii. RejectedTaskEvent ERROR log emitted when SchedulerThreadPoolExecutor fails to 
              execute any event
                  - ERROR s.a.k.c.DiagnosticEventLogger [NONE] - Review your thread configuration to prevent task rejections. Task rejections will slow down your application and some shards may stop processing. Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=0, maximumPoolSize=2147483647) - io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Test exception.

Case 2: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you did NOT modify 
or replace SchedulerThreadPoolExecutor and faced shard stuck situations or frequent 
intermittent pause in data consumption, please continue reading, else go to next case. 
These issues are potentially due to submitting more tasks to SchedulerThreadPoolExecutor, 
than it can handle, leading to delayed execution of submitted tasks.
  a.  The common symptom of this situation is the following log message,
          - ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000077: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
  b.  This release has more diagnostic log messages to identify the issues around the 
      congested SchedulerThreadPoolExecutor
          i.  FanOutRecordsPublisher WARN log indicating high time (over 11 seconds) taken 
              by SchedulerThreadPoolExecutor to deliver an event to ShardConsumer. 
              Ideally delivery time should be less than a second. If this is consistently 
              high, refer section c.
                  - WARN  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is high at 14534 millis. Check the ExecutorStateEvent logs to see the state of the executor service. Also check if the RecordProcessor's processing time is high.
          ii. FanOutRecordsPublisher DEBUG log to check the current event delivery time 
              by SchedulerThreadPoolExecutor. Ideally this should be less than a second. 
              If this is consistently high, refer section c.
                  - DEBUG  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is at 401 millis
          iii.ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads 
              is consistently high. activeThreads is considered high if it is more than 2X 
              the number of worker leases. If this is consistently high, refer section c.
                  - INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=450, coreThreads=0, leasesOwned=64, largestPoolSize=520, maximumPoolSize=2147483647)
  c.  The customers are expected to assess the state of the SchedulerThreadPoolExecutor using 
      the above diagnostic log messages and take appropriate mitigations like reducing the 
      RecordProcessor.processRecords' Time or scaling out the application. 

Case 3. All customers of KCL 2.x, prior to 2.2.2 release, were in a blind spot to throttling or 
any other exception from Cloudwatch metrics publish calls. Now these exceptions are made visible 
and we expect customers to take appropriate actions like increasing the Cloudwatch Put API TPS 
to fix the throttling issue or increasing the concurrent connections of the cloudwatch client 
to fix the limited connections issue.
  a.  Increasing the concurrency of client - CloudWatchAsyncClient.builder().region(region).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)).build();
  b.  Cloudwatch limit increase - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html

I see this message everywhere, but it's not helpful. I'm running amazon_kclpy 2.0.2 which uses 2.3.4 of the Java KCL. I'm seeing this issue as well with no option to set the timeout to something else.

royyeah commented 2 years ago

Any update on this? As we want to rate limit in these scenarios it can (by design) take longer to finish a processRecords but I can find no way to decrease the maxRecords / batchSize or control the time-out.

charles-slc commented 11 months ago

I'd also like to know if it is possible to reduce the batch size or increase the time out.

javierdiegof commented 9 months ago

Same issue. As soon as our workers start to fall behind, batches start to become very large and they overload the hosts' CPU's rendering them useless. This is a self-reinforcing loop that makes KCL on EFO brittle.