redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 815 forks source link

aws_kinesis input plugin unable to read aggregated records #1683

Open sivatarunp opened 1 year ago

sivatarunp commented 1 year ago

We use KPL to insert data into kinesis, and we use aggregated records which has compressed data. As per the logic, we store multiple user records in single kinesis record, and the data of the record will be compressed. While trying to read this data, using aws_kinesis input plugin and enabling decompress processor, getting invalid gzip header error and decompression is not happening.

But when I ensure the aggregation is less during insertion, i.e. when no:of user records are equal to no:of kinesis records, not seeing any issue, with reading of data and decompression is happening correctly.

Is this the expected behaviour with aggregated kinesis records??

Jeffail commented 1 year ago

Hey @sivatarunp if the payload received contains a concatenated set of gzipped files then internally we would need to know how to unpack those blobs into individual records before we're able to decompress them. Is there any documentation we can use to understand how aggregated records are expected to work? Asking as it sounds like we'd need to implement that logic ourselves but this is first I've heard of it.

sivatarunp commented 1 year ago

@Jeffail its not a concatenated set of gzipped files. We have individual user_records's payload gzipped and give it to KPL(Kinesis producer Library). It does aggregation internally, and modifies the payload accordingly. Generally KCL(Kinesis Consumer Library) does the de-aggregation but since benthos is using go, and not using KCL, we have to incorporate this in our code.

https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation

smazurov commented 1 year ago

Substation does de-aggregation via awslabs/kinesis-aggregation library. I can't imagine its too hard to implement in benthos

sivatarunp commented 2 months ago

hi @Jeffail any work going on in this space? Do we have the de-aggregation currently? or in road map?