twitchscience / kinsumer

Native Go consumer for AWS Kinesis streams.
Other
134 stars 35 forks source link

Add Manual Checkpointing #49

Open FugiTech opened 4 years ago

FugiTech commented 4 years ago

This adds a new config option, WithManualCheckpointing(true), that disables the automatic checkpoint on Next() and adds a new method NextWithCheckpointer() that includes a new return argument, func(), which updates the checkpoint when called. This feature allows users to delay checkpointing until a record is fully processed to ensure records are never dropped.

I've also included some additional complexity in that Kinsumer (rather than the user) ensures checkpoints are done in-order. This is done to prevent race-conditions, but could result in confusing behavior for users where calling the checkpointer could lead to indefinite blocking and checkpoints never being updated.

Good Example:

for {
  record, checkpointer, _ := k.NextWithCheckpointer()
  go func() {
    process(record)
    checkpointer() // may block for a while but will eventually complete
  }()
}

In this example every checkpointer is called, and called in a way that doesn't block processing other records, which ensures that eventually they will update the checkpoint and return. Because processing is done asynchronously it is possible a later record finishes before an earlier record and has to wait, but eventually they should all complete.

for {
  record, checkpointer, _ := k.NextWithCheckpointer()
  process(record)
  checkpointer() // should never block
}

This is also fine since serially calling checkpointers ensures they are called in order, which should never block.

Bad Examples:

var records [][]byte
var lastCheckpointer func()
for i := 0; i < 10; i++ {
  record, checkpointer, _ := k.NextWithCheckpointer()
  records = append(records, record)
  lastCheckpointer = checkpointer
}
process(records)
lastCheckpointer() // blocks forever, never actually updates the checkpoint

In this example the user thought they only needed to call the last checkpointer, since sequence numbers are always increasing the last value should encompass all prior values. This is not true, each checkpointer must be called, both to ensure ordering and since records may belong to different shards. This can be fixed by storing all checkpointers in a slice and calling them in order.

batches := map[string]batch{}
for {
  record, checkpointer, _ := k.NextWithCheckpointer()
  id := customerIDFromRecord(record)
  b := batches[id]
  b.Records = append(b.Records, record)
  b.Checkpointers = append(b.Checkpointers, checkpointer)
  if len(b.Records) >= 100 {
    process(b.Records)
    for _, checkpointer := range b.Checkpointers {
      checkpointer()
    }
    b = batch{}
  }
  batches[id] = b
}

At first glance this looks fairly reasonable: all checkpointers are being called and order is maintained. However because the user is splitting by customerID it is possible for checkpointer() to block forever waiting on a checkpointer for a different customerID, which can now never happen because we are blocking the processing loop. This can be fixed by using go checkpointer(), but logic should also be added to flush batches after a period of time to ensure checkpoints don't fall surprisingly far behind.

jbeemster commented 3 years ago

We are looking at using this project in a Kinesis Consumer app and this is one of the main concerns we have. We want to be in control of when we acknowledge a message has safely been processed.

Any chance of this change being merged in or are we better off going with a fork of this library?

jbeemster commented 3 years ago

I have now setup my own fork here (https://github.com/snowplow-devops/kinsumer) which combines this PR and the PR for returning a record instead of just the data blob as we needed both.

Let me know if you would like me to open a PR with those changes coalesced together!

garethlewin commented 3 years ago

Hi,

Sorry I have kind of neglected this branch, I'll try to get this merged ASAP.

jbeemster commented 3 years ago

No worries at all - tested with multiple shards, scaling up and down on decent volume and with multiple consumers so all seems to work as expected.

agoblet commented 3 years ago

I would benefit from this feature as well. My use case is that my consumer needs to batch multiple kinesis records, and forward them together to a Sagemaker machine learning model, as this improves the throughput a lot. WIthout this feature, there will be a risk of data loss.

I am looking into the Python KCL as alternative, but I do not like the extra Java dependencies in there. Therefore, it would be great if I could use Kinsumer in the end.

garethlewin commented 3 years ago

Sorry, I have taken a break from programming to deal with burnout, I'll be back early Feb to work and will look into this PR.

But I have been thinking about it a lot, and I have some concerns, I totally understand the needs you guys have expressed.

The first is that I'm concerned about a leaky abstraction here. By returning the aws-sdk-go Kinesis response object it becomes harder to add features that I want to support. Two examples are supporting SubscribeToShard and supporting globbing/batching records (at the very least using the KPL method, but I'd like to add the ability for anyone to supply a splitting function as at Twitch we use this a lot, and there are other people that use a similar method like @axelgobletbdr . Would perhaps a "Done()" method on the returned record be a better alternative?

Another concern I have is with a race condition and how to handle it. It's quite easy to imagine calling Next() on Kinsumer and spawning a go routine to handle that record, in that situation it's very possible that a record 3 will complete before record 2, and now I am not sure how Kinsumer should handle such a manual checkpoint. I can leave it up to the application and just ignore it (so if told to checkpoint 3, then checkpoint 2 we just end up with record 2 being the checkpoint). We could ignore checkpoints that are behind the current checkpoint or I can imagine a more complex solution that stores all checkpoints until a gapless sequence is ready and then checkpoint that.

Since the desire for this stems from preferring the "At least once" to the "No more than once" methodology that Kinsumer was written with, and your desire not to miss data, how tolerant are you of duplicate records? Kinesis itself (and definitely not Kinsumer) can't ensure "exactly once". I ask because at Twitch we already tend to add UUIDs to our records to handle the constant low rate of duplicate records that happens in Kinesis anyway, and was wondering if I should add the caching we do to dedupe those records as a first-class feature of Kinsumer.

agoblet commented 3 years ago

Hi Gareth. First of all, please do take your time for your recovery, it's more important than some feature in some open source package. I will post my reaction here now, but feel free to read it whenever you are ready.

I do not see a functional difference between a Done method on the record and a checkpointer function. I guess adding them to the returned record struct is a bit neater.

For the race condition, @Fugiman describes that it's already ensured that checkpoints are done in order, and checkpoints should be done for every record. Personally I think that is fine. Alternatively, putting the responsibility for the checkpointing order at the consumer app is fine too.

Since Kinsumer and Kinesis already cause some degree of duplication, clients should already be prepared for duplicates. A higher degree of duplicates should not matter a lot here. I do not know how hard it will be to implement such dedup logic within Kinsumer, given new checkpointing logic, consumer failures, etc. That would be something for another story I would say.

FugiTech commented 3 years ago

I'm glad this PR has been useful for people but after a year of use I'm wary of merging it in as-is. It was originally created for a service I made at Twitch and in our experience it routinely led to issues where checkpoints would not be updated despite the record being processed. This caused the service to replay the entire backlog of the affected Kinesis shard on the next deploy. For our service that was 24 hours of data which would take around 12 hours to catch up on, which was troubling for any real-time uses of the data. For our service we solved this by determining that we're actually OK with dropping a small amount of data if there are issues, and we log the sequence number of what data wasn't processed so that a secondary process can try to recover it.

I believe that if Kinsumer wants to include manual checkpointing it would be better to take the simpler approach of accepting the latest call to Done() as the truth, which would allow potentially skipping past records. If a user of Kinsumer wants stronger guarantees that all records are processed it should be easy enough to wrap Done() calls to check that they are called in order.

Finally I hope your break is going well Gareth. My team has been doing a lot of work around Kinesis and have several internal improvements for our specific use cases (especially around Enhanced Fanout) and I'd be happy to chat whenever you're available about them and if there's anything we can do to help support Kinsumer.

garethlewin commented 3 years ago

I'm still a bit stuck on this. See #62 for an example of something that this makes hard.

matelang commented 3 years ago

Is there any way to control the checkpointing process?

We'd like our consumer to gracefully give up if there is a processing error and let another consumer try the same operation, and honour the at-least-once semantics.