tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.73k stars 525 forks source link

Consumed messages reference the wrong timestamp #768

Closed thicks closed 4 years ago

thicks commented 4 years ago

Describe the bug The message timestamp on a message processed by KafakJS doesn't reflect the value on the raw message as defined by message.timestamp.type.

To Reproduce Create a topic with message.timestamp.type=LogAppendTime and publish a message to it. View the topic in Kafka. I used Control Center and kafkacat, but anything other than KafkaJS will do. Consume the message using a KafkaJS consumer and compare the values.

Expected behavior I expect the message timestamp on the message object to reflect the value on the raw message as defined by message.timestamp.type. I'd like the raw message timestamp to be passed through as the message timestamp.

Observed behavior I’ve dissected the incoming message and have found the correct timestamp in the batchContext element. The decoder pulls up the firstTimestamp which is not the message timestamp.

Environment:

Additional context Consumer records, since v.10.0, support a timestamp (as well as an attribute for timestamp type): https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html. When using “log append time”, the timestamp on the message will be overwritten when the record is ….written to the log.

When reviewing a message in Kafka which have been published via KafkaJS (or any other client) on a topic with message.timestamp.type=LogAppendTime, the messages follow the spec as expected. I can see the expected timestamp and timestamp type in the message. For example, the raw messages in Kafka shows:

{
"topic":"gjp.event.health.check”,
"partition":5,
"offset":110,
"timestamp":1592340538480,
"timestampType":"LOG_APPEND_TIME”,
"headers":[],
"key":null,
"value”:{…}
}

Where I'm struggling on the consumer side is when using a KafkaJS consumer - other Kafka client libraries behave as expected. My expectation is that the timestamp on the consumed message (once processed by KafkaJS) will reflect the timestamp on the message. Instead, it seems an incorrect timestamp is being passed through. I'm not apple to determine what the timestamp is but it seems to be the create or publish time. For example, the message handed off from KafkaJS is:

{
    topic: 'gjp.event.health.check',
    partition: 5,
    offset: '110',
    timestamp: '1592340538463',
    key: null,
    value: [Object],
    headers: {},
    isControlRecord: false,
    batchContext: [Object],
    magicByte: 2,
    attributes: 0,
}

I’ve dissected the incoming message and have found the correct timestamp in the batchContext element. Its the maxTimestamp as opposed to the firstTmestamp which is being lifted up into the higher level consumer record. Specifically, here: https://github.com/tulios/kafkajs/blob/0e0dcf7efb49d52ffc13644a5432e9e4efb5229e/src/protocol/recordBatch/record/v0/decoder.js#L19

If one were to reference maxTimestamp rather than the firstTimestamp, the consumer record timestamp would match the one in the kafka message.

Nevon commented 4 years ago

Thanks for the report! This will be fixed in the next version of kafkajs@beta, which will be out as soon as the master build is done.