harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
264 stars 90 forks source link

Consumer (possibly) stuck if sequence # refers to expired data #125

Open jtackaberry opened 4 years ago

jtackaberry commented 4 years ago

I have the following scenario:

  1. Kinesis data stream with 24 hour retention
  2. A previous run of kinesis-consumer with a checkpointed sequence number
  3. kinesis-consumer stopped for > 24 hours such that the stored sequence number refers to expired data

Now I find that upon restarting the consumer, it continues to read 0 records with MillisBehindLatest remaining at 86400000 (24 hours). The shard iterator is changing after each GetRecords call, but each request yields 0 records and showing 86400000 ms behind.

I've let this run for about 15 minutes of continuously polling GetRecords without finding any actual records. I can't be sure it wouldn't eventually find data and finally advance the sequence number, but at least after ~15 minutes of that there wasn't any real sign of progress.

So I began investigating, and implemented the following bit of logic in the consumer which quickly (within about 5 seconds) gets things moving again:

  1. In Consumer.Scan() call c.client.DescribeStreamSummaryWithContext() to discover the retention period for the stream
  2. In Consumer.ScanShard() if we return 0 records and MillisBehindLatest is >= the retention period from #1, then we can infer we have encountered this condition, and so we fetch a new shard iterator of type ShardIteratorTypeTrimHorizon.
  3. 2 is only ever done once if the condition described exists, and is never done if records were ever observed from the shard, to more ensure we are better targeting this edge case.

Like the immediate rescan stuff from #122, fetching ShardIteratorTypeTrimHorizon also takes the fast path, skipping the scan ticker. But I've also implemented the "fast path" via a new ticker which is fixed at 200ms, based on the published data plan API limits which also addresses the throttling concern mentioned in #122.

I can clean up the code and submit a PR but wanted to float the idea by you. Obviously all this work I've been doing lately is really complicated your beautifully simple shard scanning loop, but I am finding that dealing with Kinesis edge cases in practice is a bit of a subtle science and exact art. :)

jtackaberry commented 4 years ago

Another noteworthy thing about this is that it would require an additional permission on the stream for kinesis:DescribeStreamSummary (or kinesis:DescribeStream*), so given that it may need to be opt-in.