amazon-archives / amazon-kinesis-connectors

Apache License 2.0
326 stars 190 forks source link

Incorrect bufferByteSizeLimit Calculation for Batched Kinesis Records #44

Open thatjpk opened 9 years ago

thatjpk commented 9 years ago

In some cases, data is batched on the client side before sending it to Kinesis via a PutRecord request. (For instance, the data blob may be a json array containing objects that should each be handled separately. I'm doing something like this, and looking to store each object on its own line in a newline-delimited json file on S3.) The connector library supports this by including both ITransformer and ICollectionTransformer interfaces and the processRecords method in KinesisConnectorRecordProcessor contains the following:

...
if (transformer instanceof ITransformer) {
    ITransformer<T, U> singleTransformer = (ITransformer<T, U>) transformer;
    filterAndBufferRecord(singleTransformer.toClass(record), record);
} else if (transformer instanceof ICollectionTransformer) {
    ICollectionTransformer<T, U> listTransformer = (ICollectionTransformer<T, U>) transformer;
    Collection<T> transformedRecords = listTransformer.toClass(record);
    for (T transformedRecord : transformedRecords) {
        filterAndBufferRecord(transformedRecord, record);
    }
}
...

And filterAndBufferRecord is implemented as:

private void filterAndBufferRecord(T transformedRecord, Record record) {
    if (filter.keepRecord(transformedRecord)) {
        buffer.consumeRecord(transformedRecord, record.getData().array().length, record.getSequenceNumber());
    }
}

Notice that filterAndBufferRecord uses record.getData().array().length as the size of the record. So in the case of transformer being an ICollectionTransformer implementation, each sub-record is passed to buffer.consumeRecord but with the size of the whole batch instead of the individual sub-record. This means the buffer will flush too often because the size it's keeping track of is inflated.

My workaround for now is just setting bufferByteSizeLimit to Integer.MAX_VALUE, and relying on bufferRecordCountLimit to flush the buffer.

findchris commented 8 years ago

Wow, nice catch. I'll keep an eye out for this. Hopefully someone from AWS will take a look. @aws-dpt