awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
641 stars 463 forks source link

Correct way to start at TRIM_HORIZON? #531

Open sammefford opened 5 years ago

sammefford commented 5 years ago

When I noticed that the default KCL behavior is to start processing at LATEST when a shard doesn't have a checkpoint, I worried I might have data on my stream that never gets processed. I wanted to start at TRIM_HORIZON instead. So I added to my Scheduler:

            configsBuilder.leaseManagementConfig()
                .initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))

and

            configsBuilder.retrievalConfig()
                .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))

When I made those changes, one of those worked and one didn't, I don't remember which. But now I notice my iterator age frequently climbs on my kinesis stream, and often bumps at 86.4M (24 hours) showing my app is frequently reading from TRIM_HORIZON even though all shards are checkpointed in the dynamodb table. Which of those configs is right? What is inappropriately causing my app to read from TRIM_HORIZON even though all shards are checkpointed in the dynamodb table?

randeepbydesign commented 5 years ago

This is a good question- I'd like to know more about this and transitioning consumers from LATEST to TRIM_HORIZON in general. I feel like this isn't documented very well on the AWS site... or maybe I'm just not looking in the right place.

ytang07 commented 5 years ago

It's the second one: configsBuilder.retrievalConfig() .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))

yasemin-amzn commented 5 years ago

The initial_position configuration is only used for shards that doesn't have a checkpoint in the DDB table. The configured value will be used as the default value to start reading from new shards at application startup.

Once shard entry in DDB table is updated with the sequence number, i.e. check-pointed, the KCL workers will start reading from Kinesis using this sequence number as the starting position.

The last check-pointed sequence number can be old (even older than 24 hours), and this condition will lead the application to report millisBehind as 24hours on a stream that has 24 hour retention period configured.

Debugging this issue, take a look at the record processing latencies (avg stat for RecordProcessor.processRecords.Time metric), and how frequently the application is making GetRecords call (samplecount stat for KinesisDataFetcher.getRecords.Time metric).

Checkout the documentation on CW metrics that will help troubleshooting KCL application issue at: https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html

cozos commented 3 years ago

Hi @yasemin-amzn

What happens the last checkpointed sequence number is old ( > 24 hours)? Will it still be able to get a new ShardIterator?

maanasa commented 9 months ago

It seems like the below method is deprecated: configsBuilder.retrievalConfig() .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))

What is the current way to read from TRIM_HORIZON?

cap10morgan commented 4 months ago

Any insight on @maanasa's question? I can't find any new information / docs on how to set this with current versions of the KCL.

StephanWels commented 1 month ago

Any updates? This really feels poorly documented :/

cap10morgan commented 1 month ago

I eventually figured it out. Here's a basic example (apologies if there are errors, I'm translating from Clojure interop):

import software.amazon.kinesis.common.*;
import software.amazon.kinesis.processor.*;

InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
SingleStreamTracker stt = SingleStreamTracker.new("stream-name", initialPosition);
ConfigsBuilder configsBuilder = ConfigsBuilder.new(stt, "stream-name", kinesisClient, dynamoDbClient, cloudWatchClient, configUuid, shardRecordProcessorFactory);

Let me know if that's not enough context.

And yes, would be good to still get this officially documented.

StephanWels commented 1 month ago

Thank you very much @cap10morgan! - that was indeed very helpful.

This whole ConfigsBuilder Pattern seems a bit off... Some config pieces can be configured in multiple places - as in your example, the streamName appears in the StreamTracker.new, as well as in the ConfigBuilder.new call. What happens, if you use different stream names here?

Even stranger: the ConfigBuilder breaks with the otherwise quite consistently introduced (much appreciated!) Builder-pattern.

We're in the process of migrating a whole bunch of services from aws sdk v1 to aws sdk v2 and I must say: It just doesn't feel right :/