harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
264 stars 90 forks source link

in-memory checkpoint? #101

Closed ashanbrown closed 5 years ago

ashanbrown commented 5 years ago

Locally, I'm using an in-memory checkpoint store that I use in tests that looks like:


type InMemoryKinesisConsumerStore struct {
    sync.Map
}

func (c *InMemoryKinesisConsumerStore) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
    c.Store(streamName+":"+shardID, sequenceNumber)
    return nil
}

func (c *InMemoryKinesisConsumerStore) GetCheckpoint(streamName, shardID string) (string, error) {
    val, ok := c.Load(streamName + ":" + shardID)
    if !ok {
        return "", nil
    }
    return val.(string), nil
}

This is rather similar to fakeCheckpoint (which usies a mutex instead of sync.Map) at https://github.com/harlow/kinesis-consumer/blob/14db23eaf34c2d559a90fa985beea710a831d10b/consumer_test.go#L339.

I'm just wondering if you'd consider promoting this to be part of the API. The use case I have is that kinesalite is hanging indefinitely for me on some requests so I use a context with a Deadline to terminate them. This means I have to set and get checkpoints. Since I'm just running this in a single process, I'm not particularly eager to add a datastore to retain the checkpoints. Obviously, this isn't a lot of code, but I thought it might be useful to be part of the API. If you think it might be, I can draft a PR (presumably it wouldn't be in a separate package like the other stores since it has no extra dependencies other than sync).

harlow commented 5 years ago

Hi @ashanbrown thanks for the note.

I think this feels like a nice addition to the package (and then we can probably remove the fakeCheckpoint in favor of this one in the tests). You mentioned it might not need to be it's own package, and while I think there is merit to that; I also like the idea of having it be discoverable for anyone looking at store/ directory.

How do you feel about putting it in store/memory/store.go ? this would give it nice symmetry w/ the other stores.