awslabs / amazon-kinesis-producer

Amazon Kinesis Producer Library
Apache License 2.0
397 stars 330 forks source link

Messeges are getting merged #566

Open IDUN-BogdanPi opened 4 months ago

IDUN-BogdanPi commented 4 months ago

Hi,

I am using Kinesis Producer for Java:

            <dependency>
                <groupId>com.amazonaws</groupId>
                <artifactId>amazon-kinesis-producer</artifactId>
                <version>0.15.8</version>
            </dependency>

This is my send-message code snippet:

            String json = objectMapper.writeValueAsString(msg);
            log.info("Sending notification to user: {}, message :{}", msg.getPartitioningKey(), json);
            byte[] messageBytes = objectMapper.writeValueAsString(msg).getBytes(StandardCharsets.UTF_8);
            ByteBuffer data = ByteBuffer.wrap(messageBytes);
            ListenableFuture<UserRecordResult> future = kinesisProducer.addUserRecord(STREAM_NAME, msg.getPartitioningKey(), data);

            Futures.addCallback(future, new FutureCallback<>() {

The log message produced is:

Sending notification to user: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3, message :{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713264772502","status":"NOT_STARTED","timestamp":1713264772502,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}

But this is what it ends up as in Kinesis. The problem seems to appear when sending messages with different partitioning keys. They end up "merged". As you can see in the example below, this should've been 2 distinct json messages.

In the log messages, the partitioning key is correctly: 03eadfed-b2ab-45e8-8b8c-54baa6fd27f3 While on Kinesis it seems to be: a

image
����
$03eadfed-b2ab-45e8-8b8c-54baa6fd27f3
$026e854e-4bac-41ae-97b3-3b39c66cef89��{"idunId":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3","deviceId":"49-44-55-4E-00-01","action":"recordingUpdate","recordingId":"1713261592840","status":"PROCESSING","timestamp":1713264766170,"partitioningKey":"03eadfed-b2ab-45e8-8b8c-54baa6fd27f3"}��{"idunId":"026e854e-4bac-41ae-97b3-3b39c66cef89","deviceId":"49-44-55-4E-00-02","action":"liveStreamInsights","recordingId":"1713264702302","partitioningKey":"026e854e-4bac-41ae-97b3-3b39c66cef89","raw_eeg":[-- removed for brevity ---]}DZ    @�_@_+q``}3
image

Any idea what might be the problem?

Thanks for you help!

lbourdages commented 4 months ago

How do you create the Producer? There's a setting about aggregation that is set to true by default.

You can disable it like so:

KinesisProducerConfiguration config = new KinesisProducerConfiguration().setAggregationEnabled(false);

return new KinesisProducer(config);