spring-cloud / spring-cloud-stream-binder-aws-kinesis

Spring Cloud Stream binder for AWS Kinesis
Apache License 2.0
99 stars 97 forks source link

shard-iterator-type=TRIM_HORIZON not working #198

Open SledgeHammer01 opened 1 year ago

SledgeHammer01 commented 1 year ago

Spring Boot 3.1.0 spring-cloud-stream-binder-kinesis 4.0.0 Java 17 Localstack 2.1.0

Properties looks like:

spring.cloud.stream.bindings.input-in-0.destination=xxx-stream spring.cloud.stream.bindings.input-in-0.group=xxx-group spring.cloud.stream.bindings.input-in-0.content-type=text/plain

Tried both of these:

spring.cloud.stream.kinesis.bindings.input-in-0.consumer.shard-iterator-type=TRIM_HORIZON spring.cloud.stream.kinesis.bindings.input.consumer.shard-iterator-type=TRIM_HORIZON

But my consumer only gets called for new messages. If I use the AWS cli get-records against Localstack with TRIM_HORIZON, it gets all the messages, so nothing is broken there.

artembilan commented 1 year ago

You probably are missing this condition:

NOTE: When TRIM_HORIZON shard iterator type is used, we need to take into account the time lag which happens during pointing the ShardIterator to the last untrimmed record in the shard in the system (the oldest data record in the shard). So the getRecords() will move from that point to the last point, which takes time. It is by default 1 day, and it can be extended to 7 days. This happens only for new consumer groups. Any subsequent starts of the consumer in the same group are adjusted according the stored checkpoint via AFTER_SEQUENCE_NUMBER iterator type.

See docs: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties

I wonder now if this behavior really makes sense when we are explicit. It is still TRIM_HORIZON for new groups by default, but for existing group it really should be an AFTER_SEQUENCE_NUMBER by default and an explicit TRIM_HORIZON must change the logic as it is really what we have asked.

Not sure though if we can make this breaking change in the current 4.0.1. Perhaps better to defer until 4.1.0...