awslabs / kinesis-aggregation

AWS libraries/modules for working with Kinesis aggregated record data
Apache License 2.0
378 stars 153 forks source link

AggRecord does not validate record size correctly #141

Open ferozed opened 3 years ago

ferozed commented 3 years ago

In the following code:

    public boolean addUserRecord(String partitionKey, String explicitHashKey, byte[] data) {
        // set the explicit hash key for the message to the partition key -
        // required for encoding
        explicitHashKey = explicitHashKey != null ? explicitHashKey : createExplicitHashKey(partitionKey);

        // validate values from the provided message
        validatePartitionKey(partitionKey);
        validateExplicitHashKey(explicitHashKey);
        validateData(data);

        // Validate new record size won't overflow max size for a
        // PutRecordRequest
        int sizeOfNewRecord = calculateRecordSize(partitionKey, explicitHashKey, data);
        if (getSizeBytes() + sizeOfNewRecord > MAX_BYTES_PER_RECORD) {
            return false;

it validates each of key, explicitHashKey, data separately. But it does not validate that the sum of all of them is still less than MAX_RECORD_SIZE

This testcase shows the problem.

package com.zillow.amazonaws.kinesis.producer;

import org.junit.Assert;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.security.SecureRandom;

public class PutRecordsRequestEntryInformationTest
{
    @Test
        public void test()
    {
        SecureRandom random = new SecureRandom();
        byte [] buffer = new byte[1*1024*1024];
        random.nextBytes(buffer);
        String key = "key";

        try {
            PutRecordsRequestEntryInformation record = new PutRecordsRequestEntryInformation(
                    key,
                    ByteBuffer.wrap(buffer)
            );
            System.out.printf("%d\n", record.sizeInBytes());
            Assert.assertFalse("Expected exception", true);
        } catch (Exception e) {

        }

        buffer = new byte[1*1024*1024 - 20];
        random.nextBytes(buffer);
        try {
            PutRecordsRequestEntryInformation record = new PutRecordsRequestEntryInformation(
                    key,
                    ByteBuffer.wrap(buffer)
            );
            System.out.printf("%d\n", record.sizeInBytes());
            Assert.assertTrue("Expected exception", true);
        } catch (Exception e) {
            e.printStackTrace(System.err);
            Assert.assertTrue("Unexpected exception", false);
        }
    }
}

The first one is expected to fail. The second one should succeed but it doesnt.

IanMeyers commented 3 years ago

Hello,

I can't really tell what the test is doing because I don't have access to the PutRecordsRequestEntryInformation class. However, the code lines that you site are just structure checks for each of the partition key, EHK, and data buffer separately. The total message size is then checked in the calculateRecordSize() method, which contains checks for the incremental size based on whether a partition key is used vs an EHK, and whether those objects already existing the protobuf index. If you can include more detail from your test, we might be able to find the issue.

Thx!

ferozed commented 3 years ago

Sorry for the bad snippet. here is the correct one:

    @Test
    public void recordAggregatortest()
    {
        SecureRandom random = new SecureRandom();
        byte [] buffer = new byte[1*1024*1024];
        random.nextBytes(buffer);
        String key = "key";

        try {
            RecordAggregator aggregator = new RecordAggregator();
            aggregator.addUserRecord(
                    key,
                    buffer
            );
            System.out.printf("%d\n", aggregator.getSizeBytes());
            Assert.assertFalse("Expected exception", true);
        } catch (Exception e) {

        }

        int EXPECTED_MAX = 1024*1024 - 40;

        buffer = new byte[EXPECTED_MAX];
        random.nextBytes(buffer);
        try {
            RecordAggregator aggregator = new RecordAggregator();
            aggregator.addUserRecord(
                    key,
                    buffer
            );
            System.out.printf("%d\n", aggregator.getSizeBytes());
            Assert.assertFalse("Expected exception", true);
        } catch (Exception e) {
            e.printStackTrace(System.err);
            Assert.assertTrue("Unexpected exception", false);
        }
    }

So, my issue is not that this is not working... it is. but it is confusing because this will pass the validateData check, and then when it figures out the real serialized byte size, it fails.

Fine... but atleast give a better exception that helps the user. Something like...

The payload size exceeds the available limit for this partitionKey and explicitHashKey. Payload size is limited to <N> bytes
IanMeyers commented 3 years ago

Unfortunately we can't just track and error on the data size, as the message digest MD5 will change as records are added and therefore it's not a single length that is allowable. Can certainly provide better error messages however.