awslabs / kinesis-aggregation

AWS libraries/modules for working with Kinesis aggregated record data
Apache License 2.0
376 stars 154 forks source link

Is data loss possible when consume aggregated records by AWS lambda? #175

Open LilTwo opened 1 year ago

LilTwo commented 1 year ago

I've read the following warning from this page

Caution - this module is only suitable for low-value messages which are processed in aggregate. Do not use Kinesis Aggregation for data which is sensitive or where every message must be delivered, and where the KCL (including with AWS Lambda) is used for processing. DATA LOSS CAN OCCUR.

But as I know, records are not consumed by KCL in lambda and need to be deaggregated manually by invoking deaggregate, Is it still possible data loss in this case?

IanMeyers commented 1 year ago

Hello - if you are consuming data created by the Kinesis Producer Library and published onto Kinesis Data Streams, then you will be fine. The data loss issue occurs if you use the Aggregation Library from outside of the KPL and then try to publish messages. We'll update the documentation to make this more clear.

LilTwo commented 1 year ago

Hi @IanMeyers thanks for the reply, I am producing data by calling putRecord API directly, not via KPL, the data is aggregated by aggregate (https://github.com/awslabs/kinesis-aggregation/blob/master/node/lib/kpl-agg.js#L446) before puting into the stream, and consumed by AWS lambda, does that mean data loss could happen? if so, what's the casue?

IanMeyers commented 1 year ago

OK - so in this case if you are using stream autoscaling or scaling the stream manually, then it is possible that aggregated records will target Shards that don't exist. Therefore when you call PutRecords you will need to process the Failures records that come back, and those will then have to be re-aggregated to target their correct Shards. This means that you have to retain the entire record base without aggregation until all your records have been flushed to the Stream.

LilTwo commented 1 year ago

Hi @IanMeyers, much appreciate! I'm still a little confused, if my producer is like this

agg.aggregate([
  {
    data: '123',
    partitionKey: 'a'
  },
  {
    data: '456',
    partitionKey: 'b'
  }
], (d) => {
  kinesis.putRecord({
    Data: d.data,
    PartitionKey: d.partitionKey,
    StreamName: name
  }).promise().then(console.log)
})

as my understand, the PartitionKey in putRecord is the only thing that determine which shard the record will be send to even after the stream is scaled, aren't the only difference is that the same partition key could leads to a different shard (child shard) ?

IanMeyers commented 1 year ago

Correct. Let's say (for example) that your Stream only has 1 Shard, and you perform aggregation. The encoded Protobuf message will have an ExplicitHashKey that will resolve to a put onto the single Shard. However, if the Stream mutates after you performed aggregation, then it's possible that your single Protobuf message should have been dispatched to 2 separate shards with 2 separate hash keys. In this case, your PutRecords request should return a set of Failed records which have to be retried. To redrive those, you will have to re-aggregate them using the original record image. If you do not, then when a KCL client deserializes the Protobuf message, the records that didn't map to the source Shard will be dropped silently. To address this you have to either use the KPL for puts, or only use non-KCL based consumers.

LilTwo commented 1 year ago

ok thanks, that resolves my question! I think the paragraph I quoted in the issue is quite misunderstanding since AWS lambda is not a KCL client which should not have the problem we're dicussing here.

IanMeyers commented 1 year ago

I've been trying to get specific validation on this issue, but the Kinesis Lambda Event Source does use the KCL internally, and so may display this behaviour.

slootjes commented 7 months ago

@IanMeyers does this mean this tool is not safe to be used when consuming Kinesis Data streams from Lambda? I wanted to use this in combination with a QLDB stream but I'm not sure anymore if this is a good idea as it's crucial I get all records from the stream. More insights are highly appreciated, thanks :)

IanMeyers commented 7 months ago

Consumption is OK, when you are just using the deaggregate methods. The only risk with this module is when you are publishing through the aggregate API.

slootjes commented 7 months ago

Great, thanks for confirming!

jshlbrd commented 5 months ago

This part of the warning could use some clarification:

This can result in data loss unless Kinesis Aggregation is carefully designed.

The suggestion described in this issue is to check for failed records, then re-aggregate and send. If all records in the aggregated record have the same partition key, then does that also mitigate the problem (if the EHK is never provided in the protobuf)?