awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
632 stars 461 forks source link

Uncaught Netty exceptions on high volume data stream #1315

Open michaelzelaia opened 2 months ago

michaelzelaia commented 2 months ago

Hello!

We are using the 2.5.8 KCL version on a java client that runs in a Docker container. The Docker image is the Oracle Linux 7 slim and the JRE is the Adoptium Temurin 21.0.2. We have been fetching data from a low traffic volume stream for quite some time now without any issues. We recently connected a new worker (our client process is multi-threaded and we're using the single stream interface, so each of our client threads is a worker that will fetch data from a single stream) to fetch from the new, higher-volume stream that has a significant backlog of data. This stream is configured as an auto-scalable one, and as of now it has 168 shards based on what I've seen on the DynamoDB table.

At first it looked OK, but after a bit we started getting a lot of Netty errors like the following:

[io.netty.channel.DefaultChannelPipeline]: [WARN] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
 java.io.IOException: An error occurred on the connection: java.nio.channels.ClosedChannelException, [channel: ef754599]. All streams will be closed
    at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.decorateConnectionException(MultiplexedChannelRecord.java:213)
    at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeChildChannels$10(MultiplexedChannelRecord.java:205)
    at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeAndExecuteOnChildChannels$11(MultiplexedChannelRecord.java:229)
    at software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop(NettyUtils.java:248)
    at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeAndExecuteOnChildChannels(MultiplexedChannelRecord.java:220)
    at software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeChildChannels(MultiplexedChannelRecord.java:205)
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:353)
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:333)
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.access$200(Http2MultiplexedChannelPool.java:76)
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.closeAndReleaseParent(
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.channelInactive(
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2PingHandler.channelInactive(Http2PingHandler.java:77)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
    at io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:430)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
    at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1174)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Unknown Source)
 Caused by: java.nio.channels.ClosedChannelException: null
 ... 41 common frames omitted
[s.a.k.r.p.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon]: [ERROR] stream-name:shardId-000000000202 :  Exception thrown while fetching records from Kinesis
 software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate.
 Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate.
 Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout.
 If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests.
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:223)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:218)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:182)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$0(MakeAsyncHttpRequestStage.java:108)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:255)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:167)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
 Caused by: java.lang.Throwable: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate.
 Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate.
 Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout.
 If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests.
at software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:69)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:307)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:188)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
at software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:503)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
... 1 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Acquire operation took longer than 10000 milliseconds.
at software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.timeoutAcquire(HealthCheckedChannelPool.java:77)
at software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$acquire$0(HealthCheckedChannelPool.java:67)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
... 7 common frames omitted

After trying a custom httpClientBuilder for the kinesis async client (NettyNioAsyncHttpClient.Builder) to set custom maxConcurrency, maxPendingConnectionAcquires, connectionTimeout, & connectionAcquisitionTimeout to no avail we realised the machine was CPU bound and that was the main issue (the parsing was taking far too long at some points due to the CPU having to deal with too many threads), so decided to create multiple EC2 machines to deal with the workload. Now processing the messages is taking in the order of 100s of milliseconds vs consistently taking several seconds. While the mini-fleet has been able to quickly plough through the backlog and get up to speed, I can still see a few of these errors have occurred while processing the backlog and I have a few questions in this regard:

  1. What exactly are these errors and why are they happening? I have an inkling it could be the odd temporary CPU overload as Cloudwatch has gone over 90% a few times, but at the same time I find it odd that each time the CPU is overloaded the client becomes unstable.
  2. Is there any way we can avoid these errors? Maybe throttle the consumption on the client side somehow? I think our Scheduler is pretty "standard" as I used the AWS documentation as a reference, but maybe I'm missing some parameter I can customise?
  3. Do these errors have a repercussion on the data? i.e. when they do happen, is there potential data loss or is it just a case of a failing request that is retried at some point later?
  4. I've tried enabling DEBUG and look through the log, but it just to much to look at without something specific. If I could get hints on what to look for maybe I could provide more details.

For reference, this is how we build the Scheduler and the required clients:

    KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
          KinesisAsyncClient.builder().region(region).credentialsProvider(kclCredentialsProvider));

    DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder()
          .region(region)
          .credentialsProvider(dynamoDbCredentialsProvider)
          .build();

    CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder()
          .credentialsProvider(dynamoDbCredentialsProvider)
          .region(region)
          .build();

    ConfigsBuilder configsBuilder =
        new ConfigsBuilder(
            new SingleStreamTracker(
                streamName, InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON)),
            kinesisConfig.getAppName() + "-" + streamName,
            kinesisClient,
            dynamoClient,
            cloudWatchClient,
            fullWorkerId,
            recordProcFactory);

    MetricsConfig metricsConfig =
        kinesisConfig.isMetricsEnabled()
            ? configsBuilder.metricsConfig()
            : configsBuilder.metricsConfig().metricsFactory(new NullMetricsFactory());

    return new Scheduler(
        configsBuilder.checkpointConfig(),
        createCustomCoordinatorConfig(),
        configsBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        metricsConfig,
        configsBuilder.processorConfig(),
        configsBuilder
            .retrievalConfig()
            .retrievalSpecificConfig(
                new PollingConfig(kinesisConfig.getStreamName(), kinesisClient)));

This is our RecordProcessor

    public class KinesisRecordProcessor implements ShardRecordProcessor {
    [...]
    public void initialize(InitializationInput initializationInput) {
    logger.info(
        "============== Initialise ShardId => {}, Sequence => {}, Stream => {}",
        initializationInput::shardId,
        initializationInput::extendedSequenceNumber,
        streamRecordTracker::getStreamName);
        lastCheckpoint = System.currentTimeMillis();
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        long startRecs = System.currentTimeMillis();
        logger.info(
            "============== Processing => [{}] records from Stream {}",
            processRecordsInput.records().size(),
            streamRecordTracker.getStreamName());
        logger.info(
            "Shard is behind by [{} seconds] [{}].",
            () -> (processRecordsInput.millisBehindLatest() / 1_000L),
            () ->
                DurationFormatUtils.formatDurationWords(
                    processRecordsInput.millisBehindLatest(), true, true));
        long startParse = System.currentTimeMillis();
        List<KinesisMessage> parsedMessages = parseRecords(processRecordsInput.records());
        long endParse = System.currentTimeMillis();
        streamRecordTracker.addAll(parsedMessages);
        checkpointIfNeeded(System.currentTimeMillis(), processRecordsInput);
        logger.info(
            "============== Loaded => [{}] records in {} millis. Parsing took {} millis. ",
            parsedMessages.size(),
            (System.currentTimeMillis() - startRecs),
            (endParse - startParse));
      }

      @Override
      public void leaseLost(LeaseLostInput leaseLostInput) {
        logger.info("Lease [{}] lost. This is just an informative message.", leaseLostInput);
      }

      @Override
      public void shardEnded(ShardEndedInput shardEndedInput) {
        streamRecordTracker.checkpoint(shardEndedInput.checkpointer());
        logger.info("Shard [{}] ended. This is just an informative message.", shardEndedInput);
      }

      @Override
      public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        streamRecordTracker.checkpoint(shutdownRequestedInput.checkpointer());
        logger.info("Shutting KinesisRecordProcessor [{}] down now.", shutdownRequestedInput);
      }

I can provide other bits if needed.

Thank you very much for your help in advance.

mathew-sabu commented 1 month ago

Hello, I also see many warnings similar to the first one posted in this issue from my KCL application. Could this issue affect the consumption or processing of data in any way?

michaelzelaia commented 1 month ago

It's not clear whether there's data loss or not (I did not confirm this) as a result of these exceptions. I found that when it happened constantly it would eventually raise OOM exceptions and crash, so chances are data loss could happen and of course that will slow consumption down.

In our case it was definitely a case of Autoscaling drowning the EC2 instance the client was running on (over 250 shards, each shard coming with their own handful of threads), so I would recommend you keep an eye on CPU usage unless you're doing serverless. Once the client was up to speed these issues would cease. However, we decided to set the stream to a fixed number of shards to avoid this kind of problem in the future.