awslabs / amazon-kinesis-connector-flink

This is a fork of the Apache Flink Kinesis connector adding Enhanced Fanout support for Flink 1.8/1.11 on KDA.
Apache License 2.0
21 stars 11 forks source link

Getting IOException/StacklessClosedChannelException when consuming data from Kinesis #51

Open leonxu8 opened 2 years ago

leonxu8 commented 2 years ago

I used flink-1.12.5 kinesis connector to consume data from kinesis. And flink-1.12.5 should have all the latest features in amazon-kinesis-connector-flink repo.

From time to time I am getting IOException or StacklessClosedChannelException, which will fail the Flink operator when it by default reaches 10 times and trigger the entire job to restart. I have two questions:

  1. I am wondering if we can have a better way to handle the error without restarting the Flink job. Restarting the job is time-consuming and will often slow down the data consumption.
  2. These two errors are treated as retryable exceptions, instead of recoverable exceptions. Are they not recoverable if we don't restart the Flink source operator?

Followed are the exceptions:

org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$RetryableFanOutSubscriberException: org.apache.flink.kinesis.shaded.io.netty.channel.StacklessClosedChannelException
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.handleError(FanOutShardSubscriber.java:296) ~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:363) ~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188) ~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154) [blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]

or

org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$RetryableFanOutSubscriberException: java.io.IOException: An error occurred on the connection: null
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.handleError(FanOutShardSubscriber.java:296) ~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:363) ~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188) ~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
    at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154) [blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
leonxu8 commented 2 years ago

@dannycranmer Not sure if this is still the right place to post the question. I also send it to the flink user email list.