spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.16k stars 1.54k forks source link

Do not unset the message id and message timestamp from incoming message headers #1191

Closed phillipuniverse closed 5 years ago

phillipuniverse commented 5 years ago

Affects Version(s): 2.2.8.RELEASE

I am using Spring Cloud Stream with a Kafka binder.

I am specifically trying to implement idempotent messages with Spring Integration's @IdempotentReceiver. Part of this is also creating an IdempotentReceiverInterceptor. Here's what I've got for that implementation:

@Bean
public IdempotentReceiverInterceptor idempotencyInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    m -> return String.valueOf(m.getPayload());
                    ));
}

The MetadataStoreSelector utilizes that in this way:

@Override
public boolean accept(Message<?> message) {
    String key = this.keyStrategy.processMessage(message);
    Long timestamp = message.getHeaders().getTimestamp();
    String value = (this.valueStrategy != null)
            ? this.valueStrategy.processMessage(message)
            : (timestamp == null ? "0" : Long.toString(timestamp));
    return this.metadataStore.putIfAbsent(key, value) == null;
}

However, this implementation specifically drops the TIMESTAMP and ID headers on the floor, preventing me from writing functionality that can understand these duplicate messages: https://github.com/spring-projects/spring-kafka/blob/144042bab92cc075d1fef851c059e35e6adc9944/spring-kafka/src/main/java/org/springframework/kafka/support/converter/KafkaMessageHeaders.java#L39-L41

No matter what, this will null out any headers sent via the original message, remove the ID and also remove the timestamp.

When I tracked back why this was even done in the first place it took me to the resolution for #174. The batch case makes sense; since this essentially combines multiple messages into 1, the original message ids and timestamps don't make sense to put in this consolidated message. But for the single-message case I would consider this a bug as the specific message handling contracts are broken unnecessarily.

phillipuniverse commented 5 years ago

Did a little bit more looking around, I had the wrong initial cause of the problem. Looks like this has happened since this module was originally written. This issue is almost a duplicate of https://stackoverflow.com/questions/55252089/spring-kafka-how-to-fetch-timestamp-event-time-when-message-was-produced, but I would argue that it's still valid in the sense that getTimestamp() from MessageHeaders always returns null, and the original message id does not come back on the response at all.

garyrussell commented 5 years ago

Actually, this was done for performance reasons - generating a UUID is quite expensive and there was a use case for Kafka where this significantly helped throughput.

You can't really use the ID and timestamp for the idempotent receiver because if it's not null it's a new UUID each time a message is mutated.

You should use a custom header or something in the payload to detect duplicates.

garyrussell commented 5 years ago

Since it's Kafka, you can use KafkaHeaders.RECEIVED_TOPIC, KafkaHeaders.RECEIVED_PARTITION_ID and KafkaHeaders.OFFSET to determine a duplicate delivery.

garyrussell commented 5 years ago

The timestamp in that stack overflow question is different; that is a Kafka timestamp and it is provided in KafkaHeaders.RECEIVED_TIMESTAMP.

phillipuniverse commented 5 years ago

Got it, understood. Sorry for polluting the issue tracker, in hindsight this would have been better for SO.

The summary I suppose is that with the Kafka consumer the incoming Message timestamp and id will always be null, along with any other headers that I put in the original production of my Message?

garyrussell commented 5 years ago

The MessageHeaders.TIMESTAMP and ID fields have nothing to do with Kafka, they a Spring Integration headers.

Yes, all other headers will be propagated but as I said, you should use topic/partition/offset to detect duplicates.

phillipuniverse commented 5 years ago

I’m trying to divorce my particular implementation from Kafka-specific code as much as possible which is why I’m using Spring Cloud Stream and the Spring Integration Message abstraction in the first place.

Those headers do have something to do with this particular library that has functionality to transport a Message through Kafka, but yes nothing to do with Kafka itself.

Anyway, thanks for the information . Next time I’ll start with SO for clarification-like issues!