spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

In kafka events, the traceId and spanId are not written in the logs #3146

Closed oooximionnn closed 7 months ago

oooximionnn commented 7 months ago

Describe In kafka events, the traceId and spanId are not written in the logs

Environment Java 17. Spring boot starter 3.2.2 We use the following dependencies for tracing:

        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-otel</artifactId>
        </dependency>
        <dependency>
            <groupId>io.opentelemetry</groupId>
            <artifactId>opentelemetry-exporter-otlp</artifactId>
        </dependency>

Kafka Template Bean:

import io.micrometer.common.KeyValues;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;

@Configuration
@ComponentScan
@RequiredArgsConstructor
public class KafkaConfig {
    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate(final ProducerFactory<Object, Object> producerFactory) {
        final KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setObservationEnabled(true);
        kafkaTemplate.setObservationConvention(new KafkaTemplateObservationConvention() {
            @Override
            public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
                return KeyValues.of("topic", context.getDestination(),
                        "id", String.valueOf(context.getRecord().key()));
            }
        });
        return kafkaTemplate;
    }
}
import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextScheduledExecutorService;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

@Configuration(proxyBeanMethods = false)
class CommonConfiguration {

    // Example of Async Servlets setup
    @Configuration(proxyBeanMethods = false)
    @EnableAsync
    static class AsyncConfig implements AsyncConfigurer, WebMvcConfigurer {
        @Override
        public Executor getAsyncExecutor() {
            return ContextExecutorService.wrap(Executors.newCachedThreadPool(), ContextSnapshot::captureAll);
        }

        @Override
        public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
            configurer.setTaskExecutor(new SimpleAsyncTaskExecutor(r -> new Thread(ContextSnapshotFactory.builder().build().captureAll().wrap(r))));
        }
    }

    /**
     * NAME OF THE BEAN IS IMPORTANT!
     * <p>
     * We need to wrap this for @Async related things to propagate the context.
     *
     * @see EnableAsync
     */
    // [Observability] instrumenting executors
    @Bean(name = "taskExecutor", destroyMethod = "shutdown")
    ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler() {
            @Override
            protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
                ExecutorService executorService = super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                return ContextExecutorService.wrap(executorService, ContextSnapshot::captureAll);
            }

            @Override
            public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
                return ContextScheduledExecutorService.wrap(super.getScheduledExecutor());
            }
        };
        threadPoolTaskScheduler.initialize();
        return threadPoolTaskScheduler;
    }
}

application.yml file

logging:
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} %5p [${spring.application.name:-},%X{traceId:-},%X{spanId:-}] 0 --- [%15.80t] %-40.40logger{39} : %msg%n"

management:
  tracing:
    sampling:
      probability: 1.0
    propagation:
      produce: b3
      consume: b3
      type: b3,w3c

code:

@Async
    protected void sendMessage(Object message, String key,  String topic) {

        kafkaTemplate.send(topic, key, message)
                .thenAccept(result -> {
                    log.debug("producerListener -> Message was sent. msg: {}", result);
                })
                .exceptionally(ex -> {
                    log.error("producerListener -> Error: {}", message, ex);
                    return null;
                });
    }

What's going on In this case, the kafka topic places traceId and span Id in headers, but the logging pattern does not find these traceId and spanId and accordingly does not write them in logs.

What have we changed

 @Async
    protected void sendMessage(Object message, String key,  String topic) {

        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();

        kafkaTemplate.send(topic, key, message)
                .thenAccept(result -> {
                    MDC.setContextMap(copyOfContextMap);
                    log.debug("producerListener -> Message was sent. msg: {}", result);
                })
                .exceptionally(ex -> {
                    MDC.setContextMap(copyOfContextMap);
                    log.error("producerListener -> Error: {}", message, ex);
                    return null;
                });
    }

We added Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap(); to producer. and information about traceId and spanId began to appear in the logs.

Is there a better solution for not writing this?: Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();

We also used this link: https://github.com/micrometer-metrics/micrometer-samples/blob/main/kafka-producer/src/main/java/com/example/micrometer/ManualConfiguration.java It didn't help.

The problem is related to the transition to spring boot 3. On Spring boot 2, the Sleuth library was used for tracing and kafka event tracing was present. But when we started using examples and instructions from micrometer, trace id did not appear, it only appeared in kafka headers

artembilan commented 7 months ago

Apparently this is a duplication of: https://github.com/micrometer-metrics/micrometer/issues/4859.

Any chances that you can share with us a simple project where we can reproduce, debug and play with? Just to be sure that we are on the same page.

Thanks

oooximionnn commented 7 months ago

Here is simple example https://github.com/oooximionnn/KafkaTracing/ 2024-03-21_14-35-25

trace id and span id are empty

mkoralewski commented 7 months ago

I have similar problem. For ConcurrentKafkaListenerContainerFactory we set CommonErrorHandler that you can see below:

  @Bean
  public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(0, 0);
    return new DefaultErrorHandler(
        (consumerRecord, exception) -> log.error("Failed to process ATG message from topic:{}. key:{}",
            consumerRecord.topic(), consumerRecord.key(), exception),
        fixedBackOff);
  }

When I was using spring boot 3.1.9 logs produced by this handler had MDCs (traceId, spanId and one custom BaggageField). After upgrade to spring boot 3.2.4 MDCs are gone.

artembilan commented 7 months ago

@mkoralewski ,

it is not similar. This issue talks about KafkaTemplate and KafkaProducer. Your one is for consumer side. Please, consider to raise a new issue with reproducer. More over this issue talks about a migration from Spring Boot 2.x to 3.x. You claim that it was OK in the previous Spring Boot minor. However an observation instrumentation is still the same. So, something is needed to be shared with us to let us to reproduce and play with.

artembilan commented 7 months ago

@oooximionnn ,

thanks for the sample!

I didn't run it yet, but looking into the screenshot, I see that your Сообщение отправлено успешно is logged on the kafka-producer-network-thread which is definitely out of your Для прокидывания traceId в ассинхронных вызовах configuration. I mean that thread belongs to Apache Kafka Client by itself. I don't know what Spring has done before, but apparently there is no such an instrumentation for KafkaProducer. I'm not sure what that is supposed to be, but it is definitely out of Spring for Apache Kafka project scope. In general we have spread an observation instrumentation throughout Spring portfolio projects. So, the KafkaTemplate instrumentation migrated from Spring Sleuth to this project. Where the KafkaProducer instrumentation is right now, I need to figure out together with Micrometer team. Will be back when have more info...

artembilan commented 7 months ago

So, this one was done before in Spring Sleuth: https://github.com/spring-cloud/spring-cloud-sleuth/blob/3.1.x/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/kafka/TracingKafkaProducer.java#L148. We probably need to instrument similar way a Callback in the KafkaTemplate.buildCallback() since we have there an Observation to rely on. In general to do whatever was there in the: https://github.com/spring-cloud/spring-cloud-sleuth/blob/3.1.x/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/kafka/KafkaTracingCallback.java#L54

artembilan commented 7 months ago

Just tested the fix against this simple unit test:

    @Test
    void logsAreCorrelatedFromKafkaTemplateCallback() throws ExecutionException, InterruptedException {
        this.kafkaTemplate.send("test-topic", "test data")
                .thenAccept((result) -> LOG.warn("sent: " + result))
                .get();
    }

The output in logs like this:

2024-03-21T11:29:42.890-04:00  WARN 39812 --- [spring-kafka-template-tracing] [cing-producer-1] [65fc526678dd9d4c091afda72b6512cc-091afda72b6512cc] ringKafkaTemplateTracingApplicationTests : sent: SendResult [producerRecord=ProducerRecord(topic=test-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = traceparent, value = [48, 48, 45, 54, 53, 102, 99, 53, 50, 54, 54, 55, 56, 100, 100, 57, 100, 52, 99, 48, 57, 49, 97, 102, 100, 97, 55, 50, 98, 54, 53, 49, 50, 99, 99, 45, 48, 57, 49, 97, 102, 100, 97, 55, 50, 98, 54, 53, 49, 50, 99, 99, 45, 48, 48])], isReadOnly = true), key=null, value=test data, timestamp=null), recordMetadata=test-topic-1@0]

I think this is what you are looking for.

Will open PR soon.

oooximionnn commented 7 months ago

Thank you. Please tell me what I need to do in the end to get the traceId and span Id in the producer and consumer.

1) what dependencies are needed 2) do I need some kind of bean type configuration 3) what should be written in application.yml 4) Is there anything else?

we are using spring boot 3

artembilan commented 7 months ago

@oooximionnn , If you would like to test the fix, you need to use a SNAPSHOT for spring-kafka in your project. The release is going to happen on April 15th.

oooximionnn commented 6 months ago

@artembilan Hello, thank you, now I see the trace id found in the logs in the producer, but the listeners are still without that trace id logs 2024-04-18_15-54-02 2024-04-18_15-53-18

artembilan commented 6 months ago

@oooximionnn ,

Doesn't look like your question is related to the problem we have discussed in this issue.

The @KafkaListener side is fully different story. Please, consider to raise a Stackoverflow question with much more info. Meanwhile see an observationEnabled property of the ContainerProperties:

    /**
     * Set to true to enable observation via Micrometer. When false (default)
     * basic Micrometer timers are used instead (when enabled).
     * @param observationEnabled true to enable.
     * @since 3.0
     * @see #setMicrometerEnabled(boolean)
     */
    public void setObservationEnabled(boolean observationEnabled) {
oooximionnn commented 6 months ago

@artembilan Thank you very much the problem is solved :)

simonzhong1985 commented 2 months ago

@artembilan Hi, which version is the fix released in? thanks

artembilan commented 1 month ago

This issue is labeled with 3.2.0-RC1. So, everything after that (inclusive) has the fix in. Linked issue also show that it was back-ported to 3.0.16 (this generation is End of Life already), and into 3.1.4. So, upgrading your project to the latest supported version would have the fix. Hope that helps somehow.