spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

b3 header is not propagated for @KafkaListener using @RetryableTopic #1854

Open deepesh-verma opened 3 years ago

deepesh-verma commented 3 years ago

The b3 header generated by spring-cloud-sleuth for Kafka messages is not getting propagated to retry topics. The same message when sent to retry topic after an exception, has a new b3 header.

Please see the whenMessageSendWithB3HeaderToRetryableTopic_sameHeaderShouldBeCarriedForward test case below-

import ...

@Slf4j
@SpringBootTest(classes = TestApplication.class)
@EmbeddedKafka(
        topics = {
            RetryableTopicIntegrationTest.MAIN_RETRYABLE_TOPIC
        },
        partitions = 1)
class RetryableTopicIntegrationTest {

    static final String MAIN_RETRYABLE_TOPIC = "main-retryable-topic";

    private static final String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory";
    private static final String CUSTOM_TRACE_ID = "custom-trace-id";
    public static final String TRACE_ID = "traceId";

    String mainTopicTraceId;
    String retryTopicTraceId;

    private final CountDownLatch retryableTopicLatch = new CountDownLatch(1);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void whenMessageSendWithB3HeaderToRetryableTopic_sameHeaderShouldBeCarriedForward() throws InterruptedException {

        // given
        final ProducerRecord<String, String> record = new ProducerRecord<>(MAIN_RETRYABLE_TOPIC, "Test message");
        record.headers().add(CUSTOM_TRACE_ID, "12345".getBytes());

        // when
        log.info("Sending message :{}", record);
        kafkaTemplate.send(record);
        retryableTopicLatch.await(60, TimeUnit.SECONDS);

        // then
        assertThat(retryableTopicLatch.getCount()).isZero();
        log.info("Retry topic traceId: {}, Main topic traceId: {}", retryTopicTraceId, mainTopicTraceId);
        assertThat(retryTopicTraceId).isEqualTo(mainTopicTraceId);
    }

    @RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "2")
    @KafkaListener(id = "test-consumer", topics = MAIN_RETRYABLE_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
    public void listen(ConsumerRecord consumerRecord) {

        log.info("Received: {}", consumerRecord);
        Headers headers = consumerRecord.headers();

        String traceId = MDC.get(TRACE_ID);
        if (Objects.nonNull(headers.lastHeader(KafkaHeaders.ORIGINAL_TOPIC))) {
            this.retryTopicTraceId = traceId;
            retryableTopicLatch.countDown();
        } else {
            mainTopicTraceId = traceId;
        }

        throw new RuntimeException("Woooops... in topic with consumerRecord: " + consumerRecord);
    }

}

Complete test class - https://github.com/deepesh-verma/spring-kafka-sample/blob/main/src/test/java/dev/deepesh/springkafkasample/RetryableTopicIntegrationTest.java Repository with complete code - https://github.com/deepesh-verma/spring-kafka-sample

tomazfernandes commented 3 years ago

Hello @deepesh-verma, nice to see you again! That's a very interesting issue, thanks. TBH I don't know much about b3 headers, but here's what I thought about this.

Since sleuth uses thread-based MDC for the headers, I think it's expected that they won't be propagated to a new message that'll be consumed by a different thread, so I guess this is more an enhancement than a bug, right?

The solution I've used in the past for this kind of problem is to wrap the original message into a message containing the ThreadLocal values, and then set the ThreadLocal values again in message consumption while unwrapping the message so the user receives the original message.

But this has implications that go beyond the application's boundaries, because it interferes with the messages' contracts, which might be consumed by a different application, so I don't think it's necessarily a good solution for a framework.

What do you think, did you think of a solution for this?

As always it'd be nice to hear @garyrussell's and @artembilan's inputs.

Thanks again!

garyrussell commented 3 years ago

It's probably caused by this.

I am not sure what the root cause is; I haven't looked closely at how sleuth instruments the message and when it adds/removes the headers.