awslabs / kinesis-aggregation

AWS libraries/modules for working with Kinesis aggregated record data
Apache License 2.0
378 stars 153 forks source link

Deaggregating an `AggRecord` object python #56

Open AlexEshoo opened 6 years ago

AlexEshoo commented 6 years ago

I would like to be able to aggregate and deaggregate a record within the same application. I tried the following:

from aws_kinesis_agg.aggregator import RecordAggregator
from aws_kinesis_agg.deaggregator import deaggregate_records
import uuid
import json

aggregator = RecordAggregator()

result = None
while result is None:
    record = {"key": "value"}
    rec = json.dumps(record)
    result = aggregator.add_user_record(str(uuid.uuid4()), rec)

raw_record = result.get_contents()[-1]
records = deaggregate_records(raw_record)

But it fails with an exception:

Traceback (most recent call last):
  File "example.py", line 15, in <module>
    records = deaggregate_records(raw_record)
  File "/home/alexeshoo/repo/env/lib/python3.6/site-packages/aws_kinesis_agg/deaggregator.py", line 111, in deaggregate_records
    return_records.extend(iter_deaggregate_records(records))        
  File "/home/alexeshoo/repo/env/lib/python3.6/site-packages/aws_kinesis_agg/deaggregator.py", line 133, in iter_deaggregate_records
    raw_data = r['kinesis']['data']
TypeError: 'int' object is not subscriptable

Is this possible somehow?

ghost commented 6 years ago

I'll try to take a look when I get a chance, but in the meantime, I think this unit test does exactly the thing you're trying to do:

https://github.com/awslabs/kinesis-aggregation/blob/master/python/test/test_end_to_end.py#L101

Let me know if you spot the difference. Otherwise I'll try to take a look later on.

ghost commented 6 years ago

OK, I had a bit more time to look...

The issue is that the deaggregate_records call is not set up to take a byte string, which is what the code is currently passing it. It expects the input is a list of record json objects as returned by events received by an AWS Lambda function (see https://github.com/awslabs/kinesis-aggregation/blob/master/python/lambda_function.py#L21 for an example). You can see a sample of this JSON format here: https://github.com/awslabs/kinesis-aggregation/blob/master/python/test/test_end_to_end.py#L25:

To make the code work, something like this should do it:

from aws_kinesis_agg.aggregator import RecordAggregator
from aws_kinesis_agg.deaggregator import deaggregate_records
import base64
import uuid
import json

aggregator = RecordAggregator()

result = None
while result is None:
    record = {"key": "value"}
    rec = json.dumps(record)
    result = aggregator.add_user_record(str(uuid.uuid4()), rec)

raw_record = result.get_contents()[-1]

#NOTE: There are more fields, but these are the only required ones
record_list = [{ "kinesis" : { "data" : base64.b64encode(raw_record), "kinesisSchemaVersion": "1.0", "sequenceNumber": "0"} } ]
records = deaggregate_records(record_list)

I hope that helps!

AlexEshoo commented 6 years ago

Thanks for the reply, that makes sense, I will try that out.

I do think that it would make sense for the desegregation method to recognize the aggregated record that can be generated by this module and also be able to handle it.

ghost commented 6 years ago

As I was writing that code, I could definitely see how what you wrote originally seemed like it should work. It's not the most straightforward API by any means.

I think the history of it is that the Kinesis Client Library and the Kinesis Multilang Daemon existed already to do the direct translation and so this package was initially focused on dealing with Lambda-specific situations where the KCL might not be applicable.

That being said, I think I'm with you. There's no reason we couldn't make some wrapper functions that do exactly what my code sample did to your original code sample.

Do you think we should keep this issue open or create a new one?

AlexEshoo commented 6 years ago

I think just keep this issue open, I think a new issue would be superfluous.