I'm using this version >> 'software.amazon.kinesis:amazon-kinesis-connector-flink:2.1.0' and It works fine with Polling Strategy. However, I tried to use EFO (Lazy, Eager and None) instead and found strange issues that it's different than Flink confirm that
I found consumer arn has been deleted already, I think it was deleted once it's tear down state and I've seen error below.
If consumer arn is deleted why kinesis connector Flink SDK throw this error over and over again and let Flink restart forever.
It works fine at the first 5 minutes and everything went down after subscription is expired and I got errors below and It seem new consumer arn is created and then deleted by some flink subtask. I setup 1 parallelism and 5 max parallelism. I only have 1 consumer arn. Could you please check my information below ?
Lazy, Eager, and None registration type encounter similar behavior that it works only first 5 minutes.
Consumer Arn for Lazy and Eager are recreated over and over again.
title
value
source.class
org.apache.flink.runtime.executiongraph.Execution
source.file
Execution.java
error message
org.apache.flink.util.SerializedThrowable: Consumer a204892-msg-dist-kinesis-Development-efo with creation timestamp 1628505164 under stream: a204892-msg-dist-kinesis-Development, account xxxxxxxx not found. (Service: Kinesis, Status Code: 400, Request ID: f9d255ac-c807-2612-a059-52b762f25abf, Extended Request ID: 7aemhASnXRXJCIXhKLX5QPYK+gd5IOEJPjFdC+P7c2LnBVU3evjhqgVQxHt290qi6cjfUMKrhftB0VENwhlXAH99nAA0LsRz)
stacktrace
at software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException$BuilderImpl.build(ResourceNotFoundException.java:117) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException$BuilderImpl.build(ResourceNotFoundException.java:77) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.unmarshall(AwsJsonProtocolErrorUnmarshaller.java:89) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:63) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:42) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.lambda$handle$0(MetricCollectingHttpResponseHandler.java:52) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:64) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.handle(MetricCollectingHttpResponseHandler.java:52) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:89) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
at software.amazon.kinesis.shaded.software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:132) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:181) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$500(ResponseHandler.java:71) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:298) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.complete(HandlerPublisher.java:447) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.handlerRemoved(HandlerPublisher.java:435) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.callHandlerRemoved(AbstractChannelHandlerContext.java:946) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:637) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:477) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:423) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:362) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:223) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:199) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:173) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler.channelRead(LastHttpContentHandler.java:43) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.onDataRead(Http2ToHttpInboundAdapter.java:86) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:49) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:42) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.doRead0(AbstractHttp2StreamChannel.java:901) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.doBeginRead(AbstractHttp2StreamChannel.java:816) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.beginRead(AbstractHttp2StreamChannel.java:785) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.read(DefaultChannelPipeline.java:1362) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeRead(AbstractChannelHandlerContext.java:686) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.read(AbstractChannelHandlerContext.java:671) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.FlushOnReadHandler.read(FlushOnReadHandler.java:40) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeRead(AbstractChannelHandlerContext.java:686) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.read(AbstractChannelHandlerContext.java:671) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.requestDemand(HandlerPublisher.java:130) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler$1.requestDemand(HttpStreamsHandler.java:191) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.flushBuffer(HandlerPublisher.java:345) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.receivedDemand(HandlerPublisher.java:291) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.access$200(HandlerPublisher.java:61) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher$ChannelSubscription$1.run(HandlerPublisher.java:495) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at software.amazon.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[msg-dist-cd.jar:1.0-SNAPSHOT]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Problem solved once switch to apache flink kinesis connector via official website and I do not recommend anyone who would like to use Lazy and Eager for Kinesis EFO use this libraries otherwise you'll encounter strange problem and flink will keep restart because this library does not work as same as flink mentioned
Hi,
I'm using this version >> 'software.amazon.kinesis:amazon-kinesis-connector-flink:2.1.0' and It works fine with Polling Strategy. However, I tried to use EFO (Lazy, Eager and None) instead and found strange issues that it's different than Flink confirm that
I found consumer arn has been deleted already, I think it was deleted once it's tear down state and I've seen error below. If consumer arn is deleted why kinesis connector Flink SDK throw this error over and over again and let Flink restart forever.
It works fine at the first 5 minutes and everything went down after subscription is expired and I got errors below and It seem new consumer arn is created and then deleted by some flink subtask. I setup 1 parallelism and 5 max parallelism. I only have 1 consumer arn. Could you please check my information below ?
Problem solved once switch to apache flink kinesis connector via official website and I do not recommend anyone who would like to use Lazy and Eager for Kinesis EFO use this libraries otherwise you'll encounter strange problem and flink will keep restart because this library does not work as same as flink mentioned