nodefluent / kafka-streams

equivalent to kafka-streams :octopus: for nodejs :sparkles::turtle::rocket::sparkles:
https://nodefluent.github.io/kafka-streams/
MIT License
830 stars 111 forks source link

Mutating message value with version property when using 'buffer' as produceType and native client #56

Closed JRGranell closed 6 years ago

JRGranell commented 6 years ago

Hi,

When producing a JSON object using 'buffer' produceType and native client the message value may be mutated when a version property on the message is not supplied. AFAK version on an message value is not required as part of the Kafka protocol (please correct me if I'm mistaken), therefore setting version = 1 (https://github.com/nodefluent/kafka-streams/blob/master/lib/messageProduceHandle.js#L39) has the unintentional side affect when the message is produced by the native client (https://github.com/nodefluent/node-sinek/blob/master/lib/librdkafka/NProducer.js#L375)

In our implementation we take produce then transform a message a number of times, and validate the integrity of the message by using a hash of the original message and we do not use the version property on the message, this integrity check fail because kafka-streams / node-sinek adds the version property.

E.g.

const message = {
    key: '1',
    value: {
        name: 'test'
    }
}

const hash = myHashingMethod(message);

....

consumerStream
    .from('my_topic')
    .forEach(msg => {
        // Fails
        const newHash = myHashingMethod(msg);
        expect(newHash).to.eql(hash);
    });

producerStream
    .to({ topic: 'my_topic', producerType: 'buffer')
    .writeToStream(message);

We are currently migrating from Kafka 0.9 -> 1.1 and wanted to ask if the kafka protocol when using JSON data requires a version property on the message value?

Thanks,

krystianity commented 6 years ago

Hi @JRGranell in no way the Kafka protocol demands you to set a version. In this case the version is also not a message member directly is is just a field on the payload of the message's value. This is very important to understand, sinek just gives some convenience formats for JSON object values with its "buffer" formats.

If you dont want the version as value field, my suggestion is to simply use "send" as producerType and run a JSON.stringify(event) on the message value as last step before calling to().