spring-cloud / spring-cloud-stream-binder-aws-kinesis

Spring Cloud Stream binder for AWS Kinesis
Apache License 2.0
99 stars 97 forks source link

Error after Kinesis re-sharding event using kinesis binder version 2.2.0 #187

Closed DanNemes closed 1 year ago

DanNemes commented 1 year ago

We are getting the following error during a kinesis re-sharding event (up or down, doesn't really matter). This does not affect the data processing (we can still process events from the live shards) but there are a lot of logs printed out and this might affect performance after performing many re-sharding events (wasted CPU time with processing closed shards).

{"@timestamp":"2022-02-01T09:05:08.814Z","@version":"1","message":"Got an exception com.amazonaws.services.kinesis.model.InvalidArgumentException: Invalid SequenceNumber for ShardIteratorType AFTER_SEQUENCE_NUMBER, SequenceNumber has reached max possible value for the shard. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ea337009-4622-e862-b041-8a0d394b017a; Proxy: null) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49623586295030033695175315773036414839057851295876513794', timestamp=null, stream='some-name-stream', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.\nProcess will be retried on the next iteration.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO"}

artembilan commented 1 year ago

Let's see if you can upgrade your project to https://github.com/spring-projects/spring-integration-aws/releases/tag/v2.5.4! There are some fixes around closed shards and locks optimizations.

DanNemes commented 1 year ago

Thank you for your response! We will be looking in updating to 2.5.4 or a newer version. I know that there was an attempt in doing so but there were some issues. Any tips to look for when updating to that version?

artembilan commented 1 year ago

I'm not sure what is the question: you just go ahead and add this dependency into your POM:

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-aws</artifactId>
                        <version>2.5.4</version>
        </dependency>
DanNemes commented 1 year ago

I have just done a deployment using spring-integration-aws version 2.5.4 but with no luck. We still have the same problem.

We are using AWS EKS. When the pods are started for the first time (before the metadata state is saved to DynamoDB, we made sure to clear everything to have a clean state) everything is fine. The problem appears after the pods are restarted and the metadata is loaded from DynamoDB. At that point the library finds all the closed kinesis shards and tries to perform a checkpoint on that sequence number. We are using TRIM_HORIZON shard-iterator-type.

These are the logs:

{"@timestamp":"2023-02-07T19:41:03.302Z","@version":"1","message":"The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49637799291052687849931365558398561492554809994788208642', timestamp=null, stream='some-stream-name', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO","thread_name":"-kinesis-consumer-1"}

{"@timestamp":"2023-02-07T19:41:03.323Z","@version":"1","message":"Got an exception com.amazonaws.services.kinesis.model.InvalidArgumentException: Invalid SequenceNumber for ShardIteratorType AFTER_SEQUENCE_NUMBER, SequenceNumber has reached max possible value for the shard. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: dc760f27-712d-1479-802a-a4a84882ff44; Proxy: null) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49637799291052687849931365558398561492554809994788208642', timestamp=null, stream='some-stream-name', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.\nProcess will be retried on the next iteration.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO","thread_name":"-kinesis-consumer-1"}

artembilan commented 1 year ago

OK. Can you share, please, a stack trace to determine code lines involved in the problem? And that one should really point us to version we deal with, too.

DanNemes commented 1 year ago

It seems that I can't get the stack trace to be printed out and I already spent a lot of time trying to figure this one out. At the moment we will be deleting the stream and creating it again. In the future this will no longer be possible.

What I found was that the message: "The ShardConsumer has been started" appears every second. So my guess is that the current implementation is unable to figure out that the "NEW" shard is actually closed and mark it accordingly.

What other libraries would this issue be related to?

Below are some of the logs:

{"@timestamp":"2023-02-08T08:29:41.633Z","@version":"1","message":"The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49637799291052687849931365558398561492554809994788208642', timestamp=null, stream='some-stream-name', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO","thread_name":"-kinesis-consumer-1"}

{"@timestamp":"2023-02-08T08:29:42.632Z","@version":"1","message":"The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49637799291052687849931365558398561492554809994788208642', timestamp=null, stream='some-stream-name', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO","thread_name":"-kinesis-consumer-1"}

{"@timestamp":"2023-02-08T08:29:41.690Z","@version":"1","message":"Received error response: com.amazonaws.services.kinesis.model.InvalidArgumentException: Invalid SequenceNumber for ShardIteratorType AFTER_SEQUENCE_NUMBER, SequenceNumber has reached max possible value for the shard. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: d31115ce-da49-bbe3-8f4c-4a7b4574f796; Proxy: null)","logger_name":"com.amazonaws.request","log_level":"DEBUG","thread_name":"-kinesis-consumer-1"}

{"@timestamp":"2023-02-08T08:29:41.690Z","@version":"1","message":"Got an exception com.amazonaws.services.kinesis.model.InvalidArgumentException: Invalid SequenceNumber for ShardIteratorType AFTER_SEQUENCE_NUMBER, SequenceNumber has reached max possible value for the shard. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: d31115ce-da49-bbe3-8f4c-4a7b4574f796; Proxy: null) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49637799291052687849931365558398561492554809994788208642', timestamp=null, stream='some-stream-name', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.\nProcess will be retried on the next iteration.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO","thread_name":"-kinesis-consumer-1"}

artembilan commented 1 year ago

OK. Any chances to see a dependencies report:

mvn dependency:tree

I want to see versions for Kinesis Binder and Spring Integration AWS.

DanNemes commented 1 year ago

Below is the dependency tree:

[INFO] Scanning for projects... [INFO] [INFO] --< >-- [INFO] Building 1.0.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- maven-dependency-plugin:3.2.0:tree (default-cli) @ --- [INFO] [INFO] +- [INFO] +- org.springframework.boot:spring-boot-starter-web:jar:2.6.9:compile [INFO] | +- org.springframework.boot:spring-boot-starter:jar:2.6.9:compile [INFO] | | +- org.springframework.boot:spring-boot:jar:2.6.9:compile [INFO] | | +- org.springframework.boot:spring-boot-autoconfigure:jar:2.6.9:compile [INFO] | | +- org.springframework.boot:spring-boot-starter-logging:jar:2.6.9:compile [INFO] | | | +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.17.2:compile [INFO] | | | | - org.apache.logging.log4j:log4j-api:jar:2.17.2:compile [INFO] | | | - org.slf4j:jul-to-slf4j:jar:1.7.36:compile [INFO] | | +- jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile [INFO] | | +- org.springframework:spring-core:jar:5.3.21:compile [INFO] | | | - org.springframework:spring-jcl:jar:5.3.21:compile [INFO] | | - org.yaml:snakeyaml:jar:1.31:compile [INFO] | +- org.springframework.boot:spring-boot-starter-json:jar:2.6.9:compile [INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.14.0-rc1:compile [INFO] | | - com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.14.0-rc1:compile [INFO] | +- org.springframework.boot:spring-boot-starter-tomcat:jar:2.6.9:compile [INFO] | | +- org.apache.tomcat.embed:tomcat-embed-core:jar:9.0.69:compile [INFO] | | +- org.apache.tomcat.embed:tomcat-embed-el:jar:9.0.69:compile [INFO] | | - org.apache.tomcat.embed:tomcat-embed-websocket:jar:9.0.69:compile [INFO] | +- org.springframework:spring-web:jar:5.3.21:compile [INFO] | | - org.springframework:spring-beans:jar:5.3.21:compile [INFO] | - org.springframework:spring-webmvc:jar:5.3.21:compile [INFO] | +- org.springframework:spring-aop:jar:5.3.21:compile [INFO] | +- org.springframework:spring-context:jar:5.3.21:compile [INFO] | - org.springframework:spring-expression:jar:5.3.21:compile [INFO] +- org.springframework.boot:spring-boot-starter-actuator:jar:2.6.9:compile [INFO] | +- org.springframework.boot:spring-boot-actuator-autoconfigure:jar:2.6.9:compile [INFO] | | - org.springframework.boot:spring-boot-actuator:jar:2.6.9:compile [INFO] | - io.micrometer:micrometer-core:jar:1.8.7:compile [INFO] | +- org.hdrhistogram:HdrHistogram:jar:2.1.12:compile [INFO] | - org.latencyutils:LatencyUtils:jar:2.0.3:runtime [INFO] +- org.springframework.boot:spring-boot-configuration-processor:jar:2.6.9:compile (optional) [INFO] +- org.projectlombok:lombok:jar:1.18.24:compile [INFO] +- org.jooq:jool:jar:0.9.14:compile [INFO] +- [INFO] | +- [INFO] | | +- [INFO] | | | +- mysql:mysql-connector-java:jar:8.0.29:compile [INFO] | | | - org.apache.commons:commons-dbcp2:jar:2.9.0:compile [INFO] | | | - org.apache.commons:commons-pool2:jar:2.11.1:compile [INFO] | | +- [INFO] | | +- [INFO] | | | +- [INFO] | | | +- com.google.guava:guava:jar:30.1.1-jre:compile [INFO] | | | | +- com.google.guava:failureaccess:jar:1.0.1:compile [INFO] | | | | +- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile [INFO] | | | | +- com.google.code.findbugs:jsr305:jar:3.0.2:compile [INFO] | | | | +- org.checkerframework:checker-qual:jar:3.8.0:compile [INFO] | | | | +- com.google.errorprone:error_prone_annotations:jar:2.5.1:compile [INFO] | | | | - com.google.j2objc:j2objc-annotations:jar:1.3:compile [INFO] | | | +- org.json:json:jar:20180130:compile [INFO] | | | +- org.bouncycastle:bcprov-jdk15on:jar:1.59:compile [INFO] | | | - com.amazonaws:aws-java-sdk-kms:jar:1.12.292:compile [INFO] | | +- org.slf4j:slf4j-api:jar:1.7.36:compile [INFO] | | +- io.vertx:vertx-core:jar:3.9.12:compile [INFO] | | | +- io.netty:netty-common:jar:4.1.87.Final:compile [INFO] | | | +- io.netty:netty-buffer:jar:4.1.87.Final:compile [INFO] | | | +- io.netty:netty-transport:jar:4.1.87.Final:compile [INFO] | | | +- io.netty:netty-handler:jar:4.1.87.Final:compile [INFO] | | | | +- io.netty:netty-transport-native-unix-common:jar:4.1.87.Final:compile [INFO] | | | | - io.netty:netty-codec:jar:4.1.87.Final:compile [INFO] | | | +- io.netty:netty-handler-proxy:jar:4.1.87.Final:compile [INFO] | | | | - io.netty:netty-codec-socks:jar:4.1.87.Final:compile [INFO] | | | +- io.netty:netty-codec-http:jar:4.1.87.Final:compile [INFO] | | | +- io.netty:netty-codec-http2:jar:4.1.87.Final:compile [INFO] | | | +- io.netty:netty-resolver:jar:4.1.87.Final:compile [INFO] | | | - io.netty:netty-resolver-dns:jar:4.1.87.Final:compile [INFO] | | | - io.netty:netty-codec-dns:jar:4.1.87.Final:compile [INFO] | | +- commons-codec:commons-codec:jar:1.15:compile [INFO] | | +- com.amazonaws:aws-java-sdk-sts:jar:1.12.292:compile [INFO] | | +- org.apache.httpcomponents:httpclient:jar:4.5.13:compile [INFO] | | +- org.apache.httpcomponents:httpcore:jar:4.4.15:compile [INFO] | | +- io.vertx:vertx-web-client:jar:3.9.12:compile [INFO] | | | - io.vertx:vertx-web-common:jar:3.9.12:compile [INFO] | | +- io.swagger:swagger-annotations:jar:1.5.21:compile [INFO] | | +- io.swagger.core.v3:swagger-core:jar:2.0.5:compile [INFO] | | | +- javax.xml.bind:jaxb-api:jar:2.3.1:compile [INFO] | | | +- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.14.0-rc1:compile [INFO] | | | +- io.swagger.core.v3:swagger-annotations:jar:2.1.2:compile [INFO] | | | - io.swagger.core.v3:swagger-models:jar:2.0.5:compile [INFO] | | +- io.vertx:vertx-web:jar:3.9.12:compile [INFO] | | | +- io.vertx:vertx-auth-common:jar:3.9.12:compile [INFO] | | | - io.vertx:vertx-bridge-common:jar:3.9.12:compile [INFO] | | +- ch.qos.logback:logback-classic:jar:1.2.11:compile [INFO] | | | - ch.qos.logback:logback-core:jar:1.2.11:compile [INFO] | | +- ch.qos.logback.contrib:logback-json-classic:jar:0.1.5:compile [INFO] | | | - ch.qos.logback.contrib:logback-json-core:jar:0.1.5:compile [INFO] | | +- ch.qos.logback.contrib:logback-jackson:jar:0.1.5:compile [INFO] | | - io.vertx:vertx-health-check:jar:3.9.12:compile [INFO] | +- [INFO] | | +- [INFO] | | | +- io.vertx:vertx-unit:jar:3.9.12:compile [INFO] | | | +- io.vertx:vertx-config-kubernetes-configmap:jar:3.9.12:compile [INFO] | | | | - io.vertx:vertx-config:jar:3.9.12:compile [INFO] | | | +- io.vertx:vertx-config-yaml:jar:3.9.12:compile [INFO] | | | +- io.vertx:vertx-lang-js:jar:3.9.12:compile [INFO] | | | | - io.vertx:vertx-lang-js-gen:jar:3.9.12:compile [INFO] | | | | - io.vertx:vertx-micrometer-metrics:jar:3.9.12:compile [INFO] | | | +- com.networknt:json-schema-validator:jar:1.0.40:compile [INFO] | | | | - org.jruby.joni:joni:jar:2.1.31:compile [INFO] | | | | - org.jruby.jcodings:jcodings:jar:1.0.46:compile [INFO] | | | +- de.unkrig.jdisasm:jdisasm:jar:1.0.0:runtime [INFO] | | | +- org.powermock:powermock-module-junit4:jar:2.0.9:compile [INFO] | | | | +- org.powermock:powermock-module-junit4-common:jar:2.0.9:compile [INFO] | | | | | +- org.powermock:powermock-reflect:jar:2.0.9:compile [INFO] | | | | | - org.powermock:powermock-core:jar:2.0.9:compile [INFO] | | | | | - org.javassist:javassist:jar:3.27.0-GA:compile [INFO] | | | | +- junit:junit:jar:4.13.2:compile [INFO] | | | | - org.hamcrest:hamcrest-core:jar:2.2:compile [INFO] | | | | - org.hamcrest:hamcrest:jar:2.2:compile [INFO] | | | +- org.powermock:powermock-api-mockito2:jar:2.0.9:compile [INFO] | | | | - org.powermock:powermock-api-support:jar:2.0.9:compile [INFO] | | | +- org.junit.vintage:junit-vintage-engine:jar:5.8.2:compile [INFO] | | | - io.vertx:vertx-junit5:jar:3.9.12:compile [INFO] | | +- [INFO] | | | +- io.vertx:vertx-codegen:jar:3.9.12:compile [INFO] | | | | - org.mvel:mvel2:jar:2.3.1.Final:compile [INFO] | +- com.sun.xml.ws:jaxws-ri:pom:2.3.2:compile [INFO] | | +- com.sun.xml.ws:jaxws-rt:jar:2.3.2:compile [INFO] | | | +- org.glassfish.jaxb:jaxb-runtime:jar:2.3.6:compile [INFO] | | | | +- org.glassfish.jaxb:txw2:jar:2.3.6:compile [INFO] | | | | +- com.sun.istack:istack-commons-runtime:jar:3.0.12:compile [INFO] | | | | - com.sun.activation:jakarta.activation:jar:1.2.2:runtime [INFO] | | | +- com.sun.xml.ws:policy:jar:2.7.6:compile [INFO] | | | +- org.glassfish.gmbal:gmbal:jar:4.0.0:compile [INFO] | | | | +- org.glassfish.external:management-api:jar:3.2.1:compile [INFO] | | | | +- org.glassfish.pfl:pfl-basic:jar:4.0.1:compile [INFO] | | | | +- org.glassfish.pfl:pfl-tf:jar:4.0.1:compile [INFO] | | | | | +- org.glassfish.pfl:pfl-asm:jar:4.0.1:compile [INFO] | | | | | - org.glassfish.pfl:pfl-dynamic:jar:4.0.1:compile [INFO] | | | | +- org.glassfish.pfl:pfl-basic-tools:jar:4.0.1:compile [INFO] | | | | - org.glassfish.pfl:pfl-tf-tools:jar:4.0.1:compile [INFO] | | | +- org.jvnet.staxex:stax-ex:jar:1.8.1:compile [INFO] | | | +- com.sun.xml.stream.buffer:streambuffer:jar:1.5.7:compile [INFO] | | | +- org.jvnet.mimepull:mimepull:jar:1.9.15:compile [INFO] | | | +- com.sun.xml.fastinfoset:FastInfoset:jar:1.2.16:compile [INFO] | | | +- org.glassfish.ha:ha-api:jar:3.1.12:compile [INFO] | | | +- com.sun.xml.messaging.saaj:saaj-impl:jar:1.5.3:compile [INFO] | | | +- com.fasterxml.woodstox:woodstox-core:jar:5.1.0:runtime [INFO] | | | +- org.codehaus.woodstox:stax2-api:jar:4.1:runtime [INFO] | | | - jakarta.activation:jakarta.activation-api:jar:1.2.2:compile [INFO] | | +- com.sun.xml.ws:jaxws-tools:jar:2.3.2:compile [INFO] | | | +- com.sun.xml.bind:jaxb-xjc:jar:2.3.2:compile [INFO] | | | - com.sun.xml.bind:jaxb-jxc:jar:2.3.2:compile [INFO] | | +- com.sun.xml.ws:jaxws-eclipselink-plugin:jar:2.3.2:compile [INFO] | | | +- jakarta.mail:jakarta.mail-api:jar:1.6.7:compile [INFO] | | | +- jakarta.persistence:jakarta.persistence-api:jar:2.2.3:compile [INFO] | | | - org.eclipse.persistence:org.eclipse.persistence.moxy:jar:2.7.4:compile [INFO] | | | - org.eclipse.persistence:org.eclipse.persistence.core:jar:2.7.4:compile [INFO] | | | - org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.7.4:compile [INFO] | | +- com.sun.xml.ws:sdo-eclipselink-plugin:jar:2.3.2:compile [INFO] | | | +- org.eclipse.persistence:org.eclipse.persistence.sdo:jar:2.7.4:compile [INFO] | | | - org.eclipse.persistence:commonj.sdo:jar:2.1.1:compile [INFO] | | +- com.sun.xml.ws:release-documentation:zip:docbook:2.3.2:compile [INFO] | | +- com.sun.xml.ws:samples:zip:2.3.2:compile [INFO] | | +- jakarta.xml.ws:jakarta.xml.ws-api:jar:2.3.3:compile [INFO] | | +- jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3:compile [INFO] | | +- jakarta.xml.soap:jakarta.xml.soap-api:jar:1.4.2:compile [INFO] | | - jakarta.jws:jakarta.jws-api:jar:1.1.1:compile [INFO] | - org.apache.axis:axis:jar:1.4:compile [INFO] +- [INFO] | +- org.springframework.boot:spring-boot-starter-security:jar:2.6.9:compile [INFO] | | +- org.springframework.security:spring-security-config:jar:5.6.9:compile [INFO] | | - org.springframework.security:spring-security-web:jar:5.6.9:compile [INFO] | +- org.springframework.security.oauth:spring-security-oauth2:jar:2.5.2.RELEASE:compile [INFO] | | - org.springframework.security:spring-security-core:jar:5.6.9:compile [INFO] | +- org.springframework.security:spring-security-oauth2-resource-server:jar:5.6.9:compile [INFO] | | - org.springframework.security:spring-security-oauth2-core:jar:5.6.9:compile [INFO] | +- org.springframework.security:spring-security-oauth2-jose:jar:5.6.9:compile [INFO] | | - com.nimbusds:nimbus-jose-jwt:jar:9.14:compile [INFO] | | - com.github.stephenc.jcip:jcip-annotations:jar:1.0-1:compile [INFO] | - com.fasterxml.jackson.core:jackson-databind:jar:2.14.0-rc1:compile [INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.14.0-rc1:compile [INFO] | - com.fasterxml.jackson.core:jackson-core:jar:2.14.0-rc1:compile [INFO] +- org.springframework.cloud:spring-cloud-starter-openfeign:jar:3.1.3:compile [INFO] | +- org.springframework.cloud:spring-cloud-starter:jar:3.1.3:compile [INFO] | | +- org.springframework.cloud:spring-cloud-context:jar:3.1.3:compile [INFO] | | - org.springframework.security:spring-security-rsa:jar:1.0.10.RELEASE:compile [INFO] | | - org.bouncycastle:bcpkix-jdk15on:jar:1.68:compile [INFO] | +- org.springframework.cloud:spring-cloud-openfeign-core:jar:3.1.3:compile [INFO] | | +- org.springframework.boot:spring-boot-starter-aop:jar:2.6.9:compile [INFO] | | | - org.aspectj:aspectjweaver:jar:1.9.7:compile [INFO] | | - io.github.openfeign.form:feign-form-spring:jar:3.8.0:compile [INFO] | | +- io.github.openfeign.form:feign-form:jar:3.8.0:compile [INFO] | | - commons-fileupload:commons-fileupload:jar:1.4:compile [INFO] | +- org.springframework.cloud:spring-cloud-commons:jar:3.1.3:compile [INFO] | | - org.springframework.security:spring-security-crypto:jar:5.6.9:compile [INFO] | +- io.github.openfeign:feign-core:jar:11.8:compile [INFO] | - io.github.openfeign:feign-slf4j:jar:11.8:compile [INFO] +- io.github.openfeign:feign-okhttp:jar:11.8:compile [INFO] | - com.squareup.okhttp3:okhttp:jar:4.10.0:compile [INFO] | +- com.squareup.okio:okio-jvm:jar:3.0.0:compile [INFO] | | +- org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:1.6.21:compile [INFO] | | | - org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:1.6.21:compile [INFO] | | - org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.6.21:compile [INFO] | - org.jetbrains.kotlin:kotlin-stdlib:jar:1.6.21:compile [INFO] | - org.jetbrains:annotations:jar:13.0:compile [INFO] +- org.modelmapper:modelmapper:jar:3.1.0:compile [INFO] +- io.opentelemetry:opentelemetry-api:jar:1.22.0:compile [INFO] | - io.opentelemetry:opentelemetry-context:jar:1.22.0:compile [INFO] +- io.opentelemetry:opentelemetry-sdk:jar:1.22.0:compile [INFO] | +- io.opentelemetry:opentelemetry-sdk-common:jar:1.22.0:compile [INFO] | | - io.opentelemetry:opentelemetry-semconv:jar:1.22.0-alpha:runtime [INFO] | +- io.opentelemetry:opentelemetry-sdk-trace:jar:1.22.0:compile [INFO] | +- io.opentelemetry:opentelemetry-sdk-metrics:jar:1.22.0:compile [INFO] | - io.opentelemetry:opentelemetry-sdk-logs:jar:1.22.0-alpha:runtime [INFO] | - io.opentelemetry:opentelemetry-api-logs:jar:1.22.0-alpha:runtime [INFO] +- javax.validation:validation-api:jar:2.0.1.Final:compile [INFO] +- org.hibernate.validator:hibernate-validator:jar:6.2.3.Final:compile [INFO] | +- jakarta.validation:jakarta.validation-api:jar:2.0.2:compile [INFO] | +- org.jboss.logging:jboss-logging:jar:3.4.3.Final:compile [INFO] | - com.fasterxml:classmate:jar:1.5.1:compile [INFO] +- org.junit.jupiter:junit-jupiter-api:jar:5.8.2:test [INFO] | +- org.opentest4j:opentest4j:jar:1.2.0:compile [INFO] | +- org.junit.platform:junit-platform-commons:jar:1.8.2:compile [INFO] | - org.apiguardian:apiguardian-api:jar:1.1.2:compile [INFO] +- org.junit.jupiter:junit-jupiter-engine:jar:5.8.2:test [INFO] | - org.junit.platform:junit-platform-engine:jar:1.8.2:compile [INFO] +- org.mockito:mockito-junit-jupiter:jar:4.0.0:test [INFO] | - org.mockito:mockito-core:jar:4.0.0:compile [INFO] | +- net.bytebuddy:byte-buddy:jar:1.11.22:compile [INFO] | +- net.bytebuddy:byte-buddy-agent:jar:1.11.22:compile [INFO] | - org.objenesis:objenesis:jar:3.2:compile [INFO] +- co.elastic.apm:apm-agent-attach:jar:1.34.1:compile [INFO] | +- net.java.dev.jna:jna:jar:5.3.1:compile [INFO] | - net.java.dev.jna:jna-platform:jar:5.3.1:compile [INFO] +- co.elastic.apm:apm-agent-api:jar:1.34.1:compile [INFO] +- [INFO] | +- org.springframework.cloud:spring-cloud-stream-binder-kinesis:jar:2.2.0:compile [INFO] | | +- com.amazonaws:amazon-kinesis-client:jar:1.14.3:compile [INFO] | | | +- com.amazonaws:aws-java-sdk-cloudwatch:jar:1.12.292:compile [INFO] | | | - com.google.protobuf:protobuf-java:jar:3.21.7:compile [INFO] | | +- com.amazonaws:amazon-kinesis-producer:jar:0.14.6:compile [INFO] | | | +- commons-io:commons-io:jar:2.4:compile [INFO] | | | +- commons-lang:commons-lang:jar:2.6:compile [INFO] | | | - software.amazon.glue:schema-registry-serde:jar:1.0.0:compile [INFO] | | | +- software.amazon.glue:schema-registry-common:jar:1.0.0:compile [INFO] | | | | +- software.amazon.awssdk:glue:jar:2.15.32:compile [INFO] | | | | | +- software.amazon.awssdk:protocol-core:jar:2.15.32:compile [INFO] | | | | | +- software.amazon.awssdk:auth:jar:2.15.32:compile [INFO] | | | | | | - software.amazon.eventstream:eventstream:jar:1.0.1:compile [INFO] | | | | | +- software.amazon.awssdk:http-client-spi:jar:2.15.32:compile [INFO] | | | | | +- software.amazon.awssdk:regions:jar:2.15.32:compile [INFO] | | | | | +- software.amazon.awssdk:aws-core:jar:2.15.32:compile [INFO] | | | | | +- software.amazon.awssdk:metrics-spi:jar:2.15.32:compile [INFO] | | | | | +- software.amazon.awssdk:apache-client:jar:2.15.32:runtime [INFO] | | | | | - software.amazon.awssdk:netty-nio-client:jar:2.15.32:runtime [INFO] | | | | | +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.87.Final:runtime [INFO] | | | | | | - io.netty:netty-transport-classes-epoll:jar:4.1.87.Final:runtime [INFO] | | | | | - com.typesafe.netty:netty-reactive-streams-http:jar:2.0.4:runtime [INFO] | | | | | - com.typesafe.netty:netty-reactive-streams:jar:2.0.4:runtime [INFO] | | | | +- software.amazon.awssdk:aws-json-protocol:jar:2.15.30:compile [INFO] | | | | +- software.amazon.awssdk:cloudwatch:jar:2.15.30:compile [INFO] | | | | | - software.amazon.awssdk:aws-query-protocol:jar:2.15.30:compile [INFO] | | | | +- software.amazon.awssdk:sdk-core:jar:2.15.30:compile [INFO] | | | | | - software.amazon.awssdk:profiles:jar:2.15.30:compile [INFO] | | | | +- org.apache.kafka:kafka-clients:jar:3.0.1:compile [INFO] | | | | | +- com.github.luben:zstd-jni:jar:1.5.0-2:runtime [INFO] | | | | | +- org.lz4:lz4-java:jar:1.7.1:runtime [INFO] | | | | | - org.xerial.snappy:snappy-java:jar:1.1.8.1:runtime [INFO] | | | | - org.apache.kafka:kafka-streams:jar:3.0.1:compile [INFO] | | | | - org.rocksdb:rocksdbjni:jar:6.19.3:compile [INFO] | | | +- software.amazon.awssdk:arns:jar:2.15.26:compile [INFO] | | | | +- software.amazon.awssdk:annotations:jar:2.15.26:compile [INFO] | | | | - software.amazon.awssdk:utils:jar:2.15.26:compile [INFO] | | | +- org.apache.avro:avro:jar:1.9.2:compile [INFO] | | | | - org.apache.commons:commons-compress:jar:1.21:compile [INFO] | | | - org.projectlombok:lombok-utils:jar:1.18.12:compile [INFO] | | +- org.springframework.cloud:spring-cloud-stream:jar:3.2.4:compile [INFO] | | | +- org.springframework.boot:spring-boot-starter-validation:jar:2.6.9:compile [INFO] | | | +- org.springframework.integration:spring-integration-jmx:jar:5.5.13:compile [INFO] | | | - org.springframework.cloud:spring-cloud-function-context:jar:3.2.8:compile [INFO] | | | +- net.jodah:typetools:jar:0.6.2:compile [INFO] | | | +- org.springframework.cloud:spring-cloud-function-core:jar:3.2.5:compile [INFO] | | | - javax.annotation:javax.annotation-api:jar:1.3.2:compile [INFO] | | +- io.awspring.cloud:spring-cloud-starter-aws:jar:2.3.1:compile [INFO] | | | +- io.awspring.cloud:spring-cloud-aws-context:jar:2.3.1:compile [INFO] | | | - io.awspring.cloud:spring-cloud-aws-autoconfigure:jar:2.3.1:compile [INFO] | | +- com.amazonaws:aws-java-sdk-dynamodb:jar:1.12.292:compile [INFO] | | +- com.amazonaws:dynamodb-lock-client:jar:1.1.0:compile [INFO] | | - com.amazonaws:dynamodb-streams-kinesis-adapter:jar:1.5.2:compile [INFO] | +- com.amazonaws:aws-java-sdk-kinesis:jar:1.12.292:compile [INFO] | | - com.amazonaws:jmespath-java:jar:1.12.292:compile [INFO] | +- io.micrometer:micrometer-registry-prometheus:jar:1.8.7:compile [INFO] | | - io.prometheus:simpleclient_common:jar:0.12.0:compile [INFO] | | - io.prometheus:simpleclient:jar:0.12.0:compile [INFO] | | +- io.prometheus:simpleclient_tracer_otel:jar:0.12.0:compile [INFO] | | | - io.prometheus:simpleclient_tracer_common:jar:0.12.0:compile [INFO] | | - io.prometheus:simpleclient_tracer_otel_agent:jar:0.12.0:compile [INFO] | +- org.hibernate.validator:hibernate-validator-annotation-processor:jar:6.2.3.Final:compile [INFO] | +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.14.0-rc1:compile [INFO] | +- com.amazonaws:aws-java-sdk-core:jar:1.12.292:compile [INFO] | | +- commons-logging:commons-logging:jar:1.1.3:compile [INFO] | | +- software.amazon.ion:ion-java:jar:1.0.2:compile [INFO] | | +- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.14.0-rc1:compile [INFO] | | - joda-time:joda-time:jar:2.8.1:compile [INFO] | +- [INFO] | | - org.springframework.cloud:spring-cloud-starter-kubernetes-fabric8-config:jar:2.1.3:compile [INFO] | | +- org.springframework.cloud:spring-cloud-kubernetes-commons:jar:2.1.3:compile [INFO] | | +- org.springframework.cloud:spring-cloud-kubernetes-fabric8-autoconfig:jar:2.1.3:compile [INFO] | | | - io.fabric8:kubernetes-client:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-core:jar:5.10.2:compile [INFO] | | | | - io.fabric8:kubernetes-model-common:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-rbac:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-admissionregistration:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-apps:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-autoscaling:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-apiextensions:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-batch:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-certificates:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-coordination:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-discovery:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-events:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-extensions:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-flowcontrol:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-networking:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-metrics:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-policy:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-scheduling:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-storageclass:jar:5.10.2:compile [INFO] | | | +- io.fabric8:kubernetes-model-node:jar:5.10.2:compile [INFO] | | | +- com.squareup.okhttp3:logging-interceptor:jar:4.10.0:compile [INFO] | | | +- io.fabric8:zjsonpatch:jar:0.3.0:compile [INFO] | | | - com.github.mifmif:generex:jar:1.0.2:compile [INFO] | | | - dk.brics.automaton:automaton:jar:1.11-8:compile [INFO] | | - org.springframework.cloud:spring-cloud-kubernetes-fabric8-config:jar:2.1.3:compile [INFO] | | - org.springframework.cloud:spring-cloud-starter-bootstrap:jar:3.1.3:compile [INFO] | +- org.springframework.boot:spring-boot-devtools:jar:2.6.9:compile [INFO] | - org.apache.commons:commons-lang3:jar:3.12.0:compile [INFO] +- [INFO] | +- net.logstash.logback:logstash-logback-encoder:jar:6.4:compile [INFO] | +- com.github.skjolber.logback-logstash-syntax-highlighting-decorators:logback-logstash-syntax-highlighting-decorators:jar:1.0.5:compile [INFO] | | +- com.github.skjolber.jackson:jackson-syntax-highlight:jar:1.0.6:compile [INFO] | | - org.apache.commons:commons-text:jar:1.10.0:compile [INFO] | - org.codehaus.janino:janino:jar:3.1.7:compile [INFO] | - org.codehaus.janino:commons-compiler:jar:3.1.7:compile [INFO] +- org.springframework.integration:spring-integration-aws:jar:2.5.4:compile [INFO] | +- org.springframework.integration:spring-integration-core:jar:5.5.13:compile [INFO] | | +- org.springframework:spring-messaging:jar:5.3.21:compile [INFO] | | +- org.springframework:spring-tx:jar:5.3.21:compile [INFO] | | +- org.springframework.retry:spring-retry:jar:1.3.3:compile [INFO] | | - io.projectreactor:reactor-core:jar:3.4.19:compile [INFO] | | - org.reactivestreams:reactive-streams:jar:1.0.4:compile [INFO] | - io.awspring.cloud:spring-cloud-aws-core:jar:2.4.2:compile [INFO] | +- com.amazonaws:aws-java-sdk-s3:jar:1.12.292:compile [INFO] | +- com.amazonaws:aws-java-sdk-ec2:jar:1.12.292:compile [INFO] | - javax.activation:javax.activation-api:jar:1.2.0:compile [INFO] - io.opentelemetry:opentelemetry-extension-annotations:jar:0.15.0:compile [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 3.049 s [INFO] Finished at: 2023-02-08T18:12:17+02:00 [INFO] ------------------------------------------------------------------------

artembilan commented 1 year ago

OK. I see what is going on. We do have this code:

        this.shardIterator =
                                                KinesisMessageDrivenChannelAdapter.this
                                                        .amazonKinesis
                                                        .getShardIterator(shardIteratorRequest)
                                                        .getShardIterator();
                                        if (this.shardIterator == null) {
                                            // The shard is closed - stop consumer
                                            this.state = ConsumerState.STOP;
                                        }

Where that shardIteratorRequest is populated with a checkpoint which is invalid for this shard. Therefore we fail on this getShardIterator() and cannot reach that if (this.shardIterator == null).

We probably can catch this error and skip such a shard. I wish that AWS Kinesis would always return null for a closed shard. Or if they would have an API to see that shard is closed...

artembilan commented 1 year ago

OK. found something: https://stackoverflow.com/questions/26318004/how-do-i-tell-the-status-of-a-kinesis-shard

                if (shard.getSequenceNumberRange().getEndingSequenceNumber() == null) {
                    System.out.println("shard(" + i + "): " + shard.getShardId() + " is OPEN.");
                } else {
                    System.out.println("shard(" + i + "): " + shard.getShardId() + " is CLOSED.");
                }

Thank you for the report! Will fix this shortly.

artembilan commented 1 year ago

Please, consider to test your solution against spring-integration-aws-2.5.5-SNAPSHOT. You probably would need to add a https://repo.spring.io/snapshot into your pom as a repo.

DanNemes commented 1 year ago

Thank you! I will perform a test as soon as I can. I'll let you know if everything is fine.

DanNemes commented 1 year ago

This works perfectly! Thank you very much!