harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
263 stars 88 forks source link

[Question] Shard Closed Information #133

Closed jonandrewj closed 3 years ago

jonandrewj commented 3 years ago

I implement a custom consumer.Store to manage the checkpoints. For observability, I keep track of the most recent consumed item on each shard and produce some metrics for those. Unfortunately, when a shard is closed, my application currently doesn't have a way of knowing this unless I reproduce the shard iterating logic to determine if we reached the end of a closed shard. The metrics for those closed shards start to be stale and make my alerting less reliable for those situations.

consumer.go currently has:

if isShardClosed(resp.NextShardIterator, shardIterator) {
    c.logger.Log("[CONSUMER] shard closed:", shardID)
    return nil
}

So I had a couple ideas:

  1. Add a WithShardClosedCallback consumer.Option which would just accept that streamName, shardID so it knows which shards are closed.
  2. Add ShardClosed(streamName, shardID string) to the consumer.Store interface. This would be a breaking change though so probably not ideal. I think it makes sense to add to the consumer.Store interface though since any code managing checkpoints manually should be interested in a shardClosed event.
  3. Add a StoreV2 interface - probably not ideal to try to avoid a breaking change by adding multiple implementations of interfaces in a v0.
  4. Check the provided consumer.Store and see if the underlying object also implements something like:
    type ShardClosedHandler interface {
     ShardClosed(streamName, shardID string)
    }

    and then if it does call the handler.

Basically this would be backwards compatible but still add the functionality to an intuitive place. The downside is that it isn't explicit. The idea is described here: https://blog.golang.org/module-compatibility#TOC_3.

My preference is option 4, but let me know your thoughts and I'd be happy to get a PR going. Thanks

jonandrewj commented 3 years ago

I had a thought today that maybe this functionality would be valuable to a consumer that isn't using a consumer.Store or that wants to use one of the built in options from the library and then implementing the extra interface{} on the consumer.Store would be awkward.

So maybe option 1 is best because the callback can be completely separate from the consumer.Store.

harlow commented 3 years ago

Thanks for the thoughtful issue and follow up. The Versioning of modules was an interesting read.

I think I agree that this functionality could be interesting outside of Store and would def be the path of least resistance wrt backwards compat etc.

harlow commented 3 years ago

If you're game to PR that in I can get it merged for you

jonandrewj commented 3 years ago

Perfect! I'll make a PR for option 1

jonandrewj commented 3 years ago

Here's that PR: https://github.com/harlow/kinesis-consumer/pull/135

I couldn't decide between an interface and a typed function to use as the argument to WithShardClosedHandler. I'm happy to switch it to just a function if you have a preference. That might be more flexible for users 🤷.

jonandrewj commented 3 years ago

After some thought, I think the typed function is more flexible for the user. I added a commit to switch to that.