awslabs / amazon-kinesis-connector-flink

This is a fork of the Apache Flink Kinesis connector adding Enhanced Fanout support for Flink 1.8/1.11 on KDA.
Apache License 2.0
20 stars 11 forks source link

Confirm if `FanOutShardSubscriber#handleError` should perform `cause instanceof TimeoutException` instead of `ReadTimeoutException` #21

Closed bobtiernay-okta closed 3 years ago

bobtiernay-okta commented 3 years ago

I wanted to confirm that the following code:

    private void handleError(final Throwable throwable) throws FanOutSubscriberException {
        Throwable cause;
        if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
            cause = throwable.getCause();
        } else {
            cause = throwable;
        }

        LOG.warn("Error occurred on EFO subscription: {} - ({}).  {} ({})",
            throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn, cause);

        if (isInterrupted(throwable)) {
            throw new FanOutSubscriberInterruptedException(throwable);
        } else if (cause instanceof ReadTimeoutException) {
            // ReadTimeoutException occurs naturally under backpressure scenarios when full batches take longer to
            // process than standard read timeout (default 30s). Recoverable exceptions are intended to be retried
            // indefinitely to avoid system degradation under backpressure. The EFO connection (subscription) to Kinesis
            // is closed, and reacquired once the queue of records has been processed.
            throw new RecoverableFanOutSubscriberException(cause);
        } else {
            throw new RetryableFanOutSubscriberException(cause);
        }
    }

Shouldn't be checking for cause instanceof TimeoutException since ReadTimeoutException is a refinement of TimeoutException and in practice the following is observed:

[ sdk-async-response-3-4 ] [ WARN ] [ FanOutShardSubscriber ] - Error already queued. Ignoring subsequent exception.
java.util.concurrent.TimeoutException: Timed out enqueuing event SubscriptionNextEvent - shardId-000000000446 (arn:aws:kinesis:us-east-.*)
    at software.amazon.kinesis.connectors.flink.internals.publisher.fanout.FanOutShardSubscriber$FanOutShardSubscription.enqueueEvent(FanOutShardSubscriber.java:439) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at software.amazon.kinesis.connectors.flink.internals.publisher.fanout.FanOutShardSubscriber$FanOutShardSubscription.access$600(FanOutShardSubscriber.java:347) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at software.amazon.kinesis.connectors.flink.internals.publisher.fanout.FanOutShardSubscriber$FanOutShardSubscription$1.visit(FanOutShardSubscriber.java:380) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent.accept(SubscribeToShardEvent.java:230) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at software.amazon.kinesis.connectors.flink.internals.publisher.fanout.FanOutShardSubscriber$FanOutShardSubscription.onNext(FanOutShardSubscriber.java:377) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at software.amazon.kinesis.connectors.flink.internals.publisher.fanout.FanOutShardSubscriber$FanOutShardSubscription.onNext(FanOutShardSubscriber.java:347) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at software.amazon.kinesis.shaded.software.amazon.awssdk.awscore.eventstream.EventStreamAsyncResponseTransformer.deliverEvent(EventStreamAsyncResponseTransformer.java:526) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at software.amazon.kinesis.shaded.software.amazon.awssdk.awscore.eventstream.EventStreamAsyncResponseTransformer.lambda$isCompletedOrDeliverEvent$5(EventStreamAsyncResponseTransformer.java:508) ~[blob_p-140d11e42e25dda76ffd55b702844d7dfd99d414-5c7c072c2306938044361f953ab304e8:0.9.0-b207f5d]
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:834) [?:?]

And the code comment of:

// ReadTimeoutException occurs naturally under backpressure scenarios when full batches take longer to
// process than standard read timeout (default 30s). Recoverable exceptions are intended to be retried
// indefinitely to avoid system degradation under backpressure. The EFO connection (subscription) to Kinesis
// is closed, and reacquired once the queue of records has been processed.

seems to suggest that ReadTimeout would fall into this category as well.

dannycranmer commented 3 years ago

Hello. I agree we could bucket the error handling for both exceptions. However in this case the TimeoutException duration is higher than the ReadTimeoutException. The ReadTimeoutException would fire first, and be subsequently passed into the handleError method (once current record batch is consumed). The TimeoutException is logged and discarded, this is suggested by the [WARN] "Error already queued. Ignoring subsequent exception."

Therefore yes we can fix this, however based on the logs provided it would not make any functional difference.

bobtiernay-okta commented 3 years ago

Thanks for the insight. I think it's generally a good idea if that could change in the future or isn't true 100% of the time (e.g. thread scheduling delay, context switching etc.).

dannycranmer commented 3 years ago

Addressed in https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28 and will be included in 2.0.3