awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
644 stars 467 forks source link

KCL's support of at-least-once semantics #1403

Open matheisco opened 1 week ago

matheisco commented 1 week ago

Hi!

In my application I need to make sure that no record is lost. I'm not sure this is the case in certain error cases.

Consider the following case:

  1. KCL calls processRecords() with a batch of records.
  2. The business logic can't handle one of those records (e.g. some other required system is currently down), and therefore tries to put the record into an S3 bucket to not lose it.
  3. The write to the S3 bucket fails for some reason.
  4. No explicit checkpointing happens in processRecords().

In this case I want that the record is present again in the next processRecords() call (i.e. the whole previous batch is replayed for instance). Is this guaranteed when I don't call checkpoint()?

Some parts of the documentation make me believe that processRecords() will skip records from a previous call, even if there was no checkpoint in the previous call. E.g. here in this example the periodic checkpointing only makes sense if that's the case. This is problematic IMO, since a subsequent successful processRecords() would checkpoint(), therefore checkpointing the previous record that wasn't successfully processed.

akidambisrinivasan commented 5 days ago

In this case I want that the record is present again in the next processRecords() call (i.e. the whole previous batch is replayed for instance). Is this guaranteed when I don't call checkpoint()?

checkpointing or not does not affect the delivery of records. KCL will always provide the next batch of records as received from Kinesis.

KCL currently does not provide a way for a single record processor to notify an un-recoverable error in processing and have KCL shutdown that single record processor and restart processing from the last persisted checkpoint seq#. Currently, the only supported way to achieve that is to call System.exit (which brings down the whole worker), if you encounter an un-recoverable error in any record processor. We are aware of the feature request from another git hub issue.

For recoverable errors, you need to store the delivered records in-memory and reprocess them upon next delivery etc, to make sure they are processed before invoking any checkpoint.