micrometer-metrics / micrometer

An application observability facade for the most popular observability tools. Think SLF4J, but for observability.
https://micrometer.io
Apache License 2.0
4.45k stars 980 forks source link

Context is not propagating automatically in some cases. #4715

Closed marcioemiranda closed 7 months ago

marcioemiranda commented 7 months ago

Hello,

I have a webflux application that is instrumented with prometheus for metrics. It receives HTTP requests from users and also from prometheus (via actuator).

1) When prometheus calls the application to collect metrics, an observation is generated (by webflux) and I receive it in my custom observation handler. This handler logs the event and I can see the traceId and span.

2) When the user submits a request, an observation is generated (by webflux) and I also receive it in my custom observation handler. However, when I log the event I can't see traceId and span in the logs even though they are present in the context.

3) When a user request is processed and goes from the controller to the services (reactive stream), a few log messages are created. I can't see traceId and span on those messages either, even though traceId and span are propagated to kafka and I can see them being received by a kafka consumer.

So, my conclusion is that tracing is working, but automatic context propagation to logging MDC has some issue.

I am using the default logging system (as defined by Spring Boot) and no logback configuration file.

Bellow I show the timeline of log messages so that you can see how traceId and span show up in some cases:

producer_1  | 2024-02-03T12:15:16.054Z  INFO [simswap-producer,,] 1 --- [simswap-producer] [ad | producer_1] [                                                 ] org.apache.kafka.clients.Metadata        : [Producer clientId=producer_1] Cluster ID: OTMwNzFhYTY1ODNiNGE5OQ
producer_1  | 2024-02-03T12:15:16.088Z  INFO [simswap-producer,,] 1 --- [simswap-producer] [ad | producer_1] [                                                 ] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer_1] ProducerId set to 1000 with epoch 0

Initial messages does not show traceId as expected.

producer_1  | 2024-02-03T12:15:34.194Z  INFO [simswap-producer,65be2e66eed60278adf34c5b734ea655,adf34c5b734ea655] 1 --- [simswap-producer] [ctor-http-nio-2] [65be2e66eed60278adf34c5b734ea655-adf34c5b734ea655] com.vivo.simswap.cdr.config.CDRLogger    : CDR RQ: CDRLine(platform=producer, eventType=RQ, eventId=, timestamp=03/02/2024 09:15:34.192, msisdn=, imsi=, imei=, udcId=, originIp=, appServerIp=127.0.0.1, site=SP, returnCode=null, returnMessage=null)
producer_1  | 2024-02-03T12:15:34.198Z  INFO [simswap-producer,65be2e66eed60278adf34c5b734ea655,adf34c5b734ea655] 1 --- [simswap-producer] [ctor-http-nio-2] [65be2e66eed60278adf34c5b734ea655-adf34c5b734ea655] com.vivo.simswap.cdr.config.CDRLogger    : CDR RP: CDRLine(platform=producer, eventType=RP, eventId=, timestamp=03/02/2024 09:15:34.192, msisdn=null, imsi=null, imei=null, udcId=, originIp=, appServerIp=127.0.0.1, site=SP, returnCode=200, returnMessage=SUCCESS)

... then prometheus starts to make calls to collect metrics and my custom observation handler logs the events with trace Id and span.

producer_1  | 2024-02-03T12:16:01.960Z  INFO [simswap-producer,,] 1 --- [simswap-producer] [ctor-http-nio-5] [                                                 ] c.v.s.service.AddTriggerEventProducer    : produced record. key: ed33715b-f5ac-4b3c-8837-14a12a0cc954
producer_1  | 2024-02-03T12:16:01.963Z  INFO [simswap-producer,,] 1 --- [simswap-producer] [ctor-http-nio-5] [                                                 ] com.vivo.simswap.cdr.config.CDRLogger    : CDR RQ: CDRLine(platform=producer, eventType=RQ, eventId=ed33715b-f5ac-4b3c-8837-14a12a0cc954, timestamp=03/02/2024 09:16:01.963, msisdn=21956345263, imsi=313460000000200, imei=440694071795200, udcId=127.0.0.1, originIp=127.0.0.1, appServerIp=127.0.0.1, site=SP, returnCode=null, returnMessage=null)
producer_1  | 2024-02-03T12:16:01.963Z  INFO [simswap-producer,,] 1 --- [simswap-producer] [ctor-http-nio-5] [                                                 ] com.vivo.simswap.cdr.config.CDRLogger    : CDR RP: CDRLine(platform=producer, eventType=RP, eventId=ed33715b-f5ac-4b3c-8837-14a12a0cc954, timestamp=03/02/2024 09:16:01.963, msisdn=null, imsi=null, imei=null, udcId=127.0.0.1, originIp=127.0.0.1, appServerIp=127.0.0.1, site=SP, returnCode=200, returnMessage=SUCCESS)
producer_1  | 2024-02-03T12:16:02.038Z  INFO [simswap-producer,,] 1 --- [simswap-producer] [ad | producer_1] [                                                 ] c.v.s.service.AddTriggerEventProducer    : emitted event: ed33715b-f5ac-4b3c-8837-14a12a0cc954

... the user sends a request. All related logs don't show traceId and span, both in the reactive stream and observation handler.

consumer_1  | 2024-02-03 12:16:02.041 [boundedElastic-1] - [TraceId] NONE - INFO  c.v.s.s.AddTriggerEventConsumer - received message: AddTriggerEvent(id=ed33715b-f5ac-4b3c-8837-14a12a0cc954, imei=440694071795200, imsi=313460000000200, msisdn=21956345263)
consumer_1  | 2024-02-03 12:16:02.041 [boundedElastic-1] - [TraceId] NONE - INFO  c.v.s.s.AddTriggerEventConsumer - traceparent - 00-65be2e817cb88a0fa6d160760b5a35e1-a6d160760b5a35e1-01

Notice that traceId and span are propagated through kafka to a consumer service, so tracing is working.

producer_1  | 2024-02-03T12:16:14.089Z  INFO [simswap-producer,65be2e8eaba11cefb7d676f445ae6f2e,b7d676f445ae6f2e] 1 --- [simswap-producer] [ctor-http-nio-2] [65be2e8eaba11cefb7d676f445ae6f2e-b7d676f445ae6f2e] com.vivo.simswap.cdr.config.CDRLogger    : CDR RQ: CDRLine(platform=producer, eventType=RQ, eventId=, timestamp=03/02/2024 09:16:14.089, msisdn=, imsi=, imei=, udcId=, originIp=, appServerIp=127.0.0.1, site=SP, returnCode=null, returnMessage=null)
producer_1  | 2024-02-03T12:16:14.090Z  INFO [simswap-producer,65be2e8eaba11cefb7d676f445ae6f2e,b7d676f445ae6f2e] 1 --- [simswap-producer] [ctor-http-nio-2] [65be2e8eaba11cefb7d676f445ae6f2e-b7d676f445ae6f2e] com.vivo.simswap.cdr.config.CDRLogger    : CDR RP: CDRLine(platform=producer, eventType=RP, eventId=, timestamp=03/02/2024 09:16:14.089, msisdn=null, imsi=null, imei=null, udcId=, originIp=, appServerIp=127.0.0.1, site=SP, returnCode=200, returnMessage=SUCCESS)

... subsequent prometheus calls (via actuator) continue to show traceId and span in logs.

My gradle dependencies are configured like this:

dependencies {

    // mac
    implementation group: 'io.netty', name: 'netty-resolver-dns-native-macos', classifier: 'osx-aarch_64'

    // Shared
    implementation project(':common')
    implementation project(':cdr')

    // Spring Boot
    implementation platform("org.springframework.boot:spring-boot-dependencies:${springBootVersion}")
    annotationProcessor platform("org.springframework.boot:spring-boot-dependencies:${springBootVersion}")

    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'

    // tracing - Default is Brave + Zipkin, you can opt in via "-Pwavefront" to Wavefront and "-Potel" for OTel
    if (project.hasProperty('wavefront')) {
        implementation 'io.micrometer:micrometer-tracing-reporter-wavefront'
    }
    if (project.hasProperty('otel')) {
        implementation 'io.micrometer:micrometer-tracing-bridge-otel'
        implementation 'io.opentelemetry:opentelemetry-exporter-zipkin'
    }
    else {
        implementation 'io.micrometer:micrometer-tracing-bridge-brave'
        implementation 'io.zipkin.reporter2:zipkin-reporter-brave'
    }

    // tracing - reactive
    implementation 'io.micrometer:context-propagation'

    // metrics
    runtimeOnly 'io.micrometer:micrometer-registry-prometheus'

    // pushing logs out
    //runtimeOnly 'com.github.loki4j:loki-logback-appender:1.3.2'

    // xml unmarshal
    implementation 'jakarta.xml.bind:jakarta.xml.bind-api'
    implementation 'com.sun.xml.bind:jaxb-core'
    runtimeOnly 'com.sun.xml.bind:jaxb-impl'

    // kafka
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'io.projectreactor.kafka:reactor-kafka' //:1.3.22

    // utils
    implementation 'org.apache.commons:commons-lang3:3.14.0'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'

    // Logging
    implementation 'net.logstash.logback:logstash-logback-encoder:7.4'

    // tests
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testImplementation 'io.projectreactor:reactor-test'
    testImplementation 'io.micrometer:micrometer-test'
    testImplementation 'commons-io:commons-io:2.15.1'
}

Tracing is enabled in application.yaml

server:
  port: 8787

logging:
  pattern.level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"
  level:
    web: error
    root: info

spring:
  main:
    web-application-type: reactive
  application:
    name: simswap-producer
  kafka:
    producer:
      client-id: ${CLIENT_ID}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  endpoint:
    health:
      show-details: always
  endpoints:
    web:
      exposure:
        include: health, prometheus, metrics
  # For Exemplars to work we need histogram buckets
  metrics:
    distribution:
      percentiles-histogram:
        http:
          server:
            requests: true

Automatic context propagation is configured in the main class

@SpringBootApplication
public class SimswapApplication {

    public static void main(String[] args) {
        Hooks.enableAutomaticContextPropagation();
        SpringApplication.run(SimswapApplication.class, args);
    }

}

... and also in a configuration file

@Configuration
public class ObservationConfig {

    @Autowired
    ObservationRegistry registry;

    @PostConstruct
    void setup() {
        ObservationThreadLocalAccessor.getInstance().setObservationRegistry(registry);
    }
}

The controller just forwards the request to a service in a reactive stream

@PostMapping(value = "/swap", consumes = { "application/xml" }, produces = { "application/xml" })
    public Mono<ResponseEntity<Void>> addTriggerFacade(@RequestBody Envelope envelope)
            throws JsonProcessingException {
        ParamSubscriberDetails details = envelope.getBody().getSetSubscriberDetails().getParamSubscriberDetails();
        return this.service.handleAddTrigger(details.getImei(), details.getImsi(), details.getMsisdn())
                .map(ResponseEntity::ok);
    }

The service hydrates current observation with some information from the request and uses sink to signal the event to a event publisher

@Service
@AllArgsConstructor
public class AddTriggerService {

        private final AddTriggerEventProducer eventProducer;

        public Mono<Void> handleAddTrigger(String imei, String imsi, String msisdn) {
                return Mono.just(new AddTriggerEvent(imei, imsi, msisdn))
                                .transformDeferredContextual((eventMono, ctx) -> eventMono.doOnNext(event -> {
                                        Observation observation = ctx.get(ObservationThreadLocalAccessor.KEY);
                                        Context observationCtx = observation.getContext();
                                        observationCtx.addHighCardinalityKeyValue(
                                                        KeyValue.of(CDRHighCardinalityKeyNames.MSISDN,
                                                                        event.getMsisdn()));
                                        observationCtx
                                                        .addHighCardinalityKeyValue(
                                                                        KeyValue.of(CDRHighCardinalityKeyNames.IMSI,
                                                                                        event.getImsi()));
                                        observationCtx
                                                        .addHighCardinalityKeyValue(
                                                                        KeyValue.of(CDRHighCardinalityKeyNames.IMEI,
                                                                                        event.getImei()));
                                        observationCtx
                                                        .addHighCardinalityKeyValue(
                                                                        KeyValue.of(CDRHighCardinalityKeyNames.EVENT_ID,
                                                                                        event.getId()));
                                }))
                                .doOnNext(e -> this.eventProducer.emitEvent(e))
                                .then();
        }

}

The kafka producer processes the event from the previous service and write some log messages (the ones that don't show traceId and span)

@Slf4j
@Service
@RequiredArgsConstructor
public class AddTriggerEventProducer {

    private final ReactiveKafkaProducerTemplate<String, AddTriggerEvent> template;
    private final Sinks.Many<AddTriggerEvent> sink;
    private final Flux<AddTriggerEvent> flux;
    private final String topic;

    public void subscribe() {
        var srFlux = this.flux
                .map(e -> new ProducerRecord<>(topic, e.getId(), e))
                .doOnNext(r -> log.info("produced record. key: {}", r.key()))
                .map(pr -> SenderRecord.create(pr, pr.key()));
        this.template.send(srFlux)
                .doOnNext(r -> log.info("emitted event: {}", r.correlationMetadata()))
                .subscribe();
    }

    public void emitEvent(AddTriggerEvent event) {
        this.sink.tryEmitNext(event);
    }

}

Except for not showing traceId and span in the logs everything is working as expected.

I can add the observation handler code if needed. Like said before, for observations initiated by actuactor calls the logs show traceId and span, for user initiated calls the same logs doesn't show traceId and span. So I don't think it's a problem with the handler code, but with the MDC content.

These are some relevant dependencies (resolved by gradle):

+--- io.micrometer:micrometer-registry-prometheus -> 1.12.2 (*)
+--- io.micrometer:context-propagation -> 1.1.0
+--- org.springframework.boot:spring-boot-starter-actuator -> 3.2.2
+--- org.springframework.boot:spring-boot-dependencies:3.2.+ -> 3.2.2 (*)
+--- org.springframework.boot:spring-boot-starter-webflux -> 3.2.2 (*)
+--- io.micrometer:micrometer-tracing-bridge-brave -> 1.2.2
|    +--- io.micrometer:micrometer-tracing:1.2.2
|    |    +--- io.micrometer:micrometer-observation:1.12.2 (*)
|    |    +--- io.micrometer:context-propagation:1.1.0
|    |    \--- aopalliance:aopalliance:1.0
|    +--- org.slf4j:slf4j-api:1.7.36 -> 2.0.11
|    +--- io.zipkin.brave:brave:5.16.0
|    +--- io.zipkin.brave:brave-context-slf4j:5.16.0
|    |    \--- io.zipkin.brave:brave:5.16.0
|    +--- io.zipkin.brave:brave-instrumentation-http:5.16.0
|    |    \--- io.zipkin.brave:brave:5.16.0
|    \--- io.zipkin.aws:brave-propagation-aws:0.23.5
|         \--- io.zipkin.brave:brave:5.13.7 -> 5.16.0

To reproduce the problem I just have to submit a user request. I expected to see traceId and span in the logs generated by user requests, the same way that I see for actuator initiated requests.

I am happy to provide additional information if needed. Please advise. Many thanks.

ttddyy commented 7 months ago

I didn't go through the details but my educated guess is:

It is using MDCScopeDecorator (from Brave) which is auto-configured by Boot (see BravePropagationConfigurations). This mechanism propagates MDC with trace/span ids. However, since it is a webflux app, it doesn't propagate MDC values when reactor scheduler switches threads.

I suggest trying to register a ThreadLocalAccessor implementation for MDC. (see https://github.com/micrometer-metrics/context-propagation/pull/194)

This way, thread changes by reactor should propagate MDC, which, in turn, I think properly log trace/span ids in webflux operator chain.

marcioemiranda commented 7 months ago

@ttddyy thanks for the suggestions. I'll take a look into that.

I've added additional logs to understand where traceIds stop being propagated. It seems that it happens because I am using sink to signal an event.

public Mono<Void> handleAddTrigger(String imei, String imsi, String msisdn) {
                **log.info("I am at the service, outside of reactive stream");**
                return Mono.just(new AddTriggerEvent(imei, imsi, msisdn))
                                .transformDeferredContextual((eventMono, ctx) -> eventMono.doOnNext(event -> {
                                        Observation observation = ctx.get(ObservationThreadLocalAccessor.KEY);
                                        Context observationCtx = observation.getContext();
                                        observationCtx.addHighCardinalityKeyValue(
                                                        KeyValue.of(CDRHighCardinalityKeyNames.MSISDN,
                                                                        event.getMsisdn()));
                                        observationCtx
                                                        .addHighCardinalityKeyValue(
                                                                        KeyValue.of(CDRHighCardinalityKeyNames.IMSI,
                                                                                        event.getImsi()));
                                        observationCtx
                                                        .addHighCardinalityKeyValue(
                                                                        KeyValue.of(CDRHighCardinalityKeyNames.IMEI,
                                                                                        event.getImei()));
                                        observationCtx
                                                        .addHighCardinalityKeyValue(
                                                                        KeyValue.of(CDRHighCardinalityKeyNames.EVENT_ID,
                                                                                        event.getEventId()));
                                }))
                                **.doOnNext(e -> this.eventProducer.emitEvent(e))
                                .doOnNext(e -> log.info("I am at the service, inside the reactive stream"))**
                                .then();
        }
public void emitEvent(AddTriggerEvent event) {
        this.sink.tryEmitNext(event);
    }

The logs called from the controller and service are showing traceId and span info, so this is not a bug but rather a lack of proper understanding on how this propagation actually works. I thought the context-propagation library would handle every scenario automatically.

**producer_1  | 2024-02-04T13:57:36.175Z  INFO 1 --- [simswap-producer] [ctor-http-nio-6] [65bf97d0d37f2251033960b12677edb9-033960b12677edb9] com.vivo.simswap.web.SoapController      : I am at the controller, outside of reactive stream**
**producer_1  | 2024-02-04T13:57:36.176Z  INFO 1 --- [simswap-producer] [ctor-http-nio-6] [65bf97d0d37f2251033960b12677edb9-033960b12677edb9] c.v.simswap.service.AddTriggerService    : I am at the service, outside of reactive stream**

producer_1  | 2024-02-04T13:57:36.195Z  INFO 1 --- [simswap-producer] [ctor-http-nio-6] [                                                 ] c.v.s.service.AddTriggerEventProducer    : produced record. key: 124e0272-8c1c-412d-b47d-e4e92f1f56c9

**producer_1  | 2024-02-04T13:57:36.195Z  INFO 1 --- [simswap-producer] [ctor-http-nio-6] [65bf97d0d37f2251033960b12677edb9-033960b12677edb9] c.v.simswap.service.AddTriggerService    : I am at the service, inside the reactive stream**

producer_1  | 2024-02-04T13:57:36.198Z  INFO 1 --- [simswap-producer] [ctor-http-nio-6] [                                                 ] com.vivo.simswap.cdr.logging.CDRLogger   : CDR RQ: CDRLine(platform=producer, eventType=RQ, eventId=124e0272-8c1c-412d-b47d-e4e92f1f56c9, timestamp=04/02/2024 10:57:36.197, msisdn=21956345263, imsi=313460000000200, imei=440694071795200, udcId=127.0.0.1, originIp=127.0.0.1, appServerIp=127.0.0.1, site=SP, returnCode=null, returnMessage=null)

producer_1  | 2024-02-04T13:57:36.198Z  INFO 1 --- [simswap-producer] [ctor-http-nio-6] [                                                 ] com.vivo.simswap.cdr.logging.CDRLogger   : CDR RP: CDRLine(platform=producer, eventType=RP, eventId=124e0272-8c1c-412d-b47d-e4e92f1f56c9, timestamp=04/02/2024 10:57:36.197, msisdn=null, imsi=null, imei=null, udcId=127.0.0.1, originIp=127.0.0.1, appServerIp=127.0.0.1, site=SP, returnCode=200, returnMessage=SUCCESS)

producer_1  | 2024-02-04T13:57:36.323Z  INFO 1 --- [simswap-producer] [ad | producer_1] [                                                 ] c.v.s.service.AddTriggerEventProducer    : emitted event: 124e0272-8c1c-412d-b47d-e4e92f1f56c9
ttddyy commented 7 months ago

Since the provided code is not simple enough involving kafka etc, I didn't fully understand the scenario. However, to me, it looks like propagation is working as expected.

In the provided log, there are three entries that have populated trace/span ids. They are associated with the original http request, so propagation happens. For other log entries which are missing ids coming from AddTriggerEventProducer and CDRLogger, they seem to be not associated with the original request, consumed by kafka with sink. I'm not sure about the messaging part, but for sink, whoever is processing the sink is usually outside of the original request, so I don't think it can propagate the information from the original request. Therefore, they don't show ids in the log.

marcioemiranda commented 7 months ago

Hello again,

@ttddyy , thanks for the the previous comments. I am getting closer, just one part is missing.

We use Sinks to de-couple components inside the app. It acts as an "event-bus". A producer component emits an event using Sinks.Many and an event consumer listens from the Sink getting a Flux of events. In this case the producer is the Webflux HTTP stream and the consumer is a Reactive kafka producer stream.

As observed in the previous comment, we loose context when we do that.

The solution so far was to add the Observation to the metadata section of the event in the producer side, and restore the observation to ThreadLocal in the consumer side.

By doing that we solved the MDC issue and also the messaging context propagation (now Kafka is propagating the correct traceId created in the HTTP request).

Producer side

Mono.just(new MyEvent(UUID.randomUUID().toString(), otherdata))
                                .transformDeferredContextual((eventMono, ctx) -> eventMono.doOnNext(event -> {
                                        Observation observation = ctx.get(ObservationThreadLocalAccessor.KEY);
                                        Context observationCtx = observation.getContext();
                                        //... enrich current observation ctx with tags
                                }).map(e -> DomainEvent.<MyEvent>builder()
                                                .data(e)
                                                .currentObservation(ctx.get(ObservationThreadLocalAccessor.KEY))
                                                .build())) // DomainEvent wraps event data and metadata such as Observation
                                .doOnNext(de -> this.eventProducer.emitEvent(de))  // Emit wrapped event
                                .then();

Consumer side

var srFlux = this.flux
                .doOnNext(de -> {
                    if (de.getObservation() != null) { // Receive observation as event metadata
                        ObservationThreadLocalAccessor.getInstance().restore(de.getObservation()); //restore using ObservationThreadLocalAccessor
                    }
                })
                .map(de -> de.getData()) // use event data as payload to kafka message
                .map(e -> new ProducerRecord<>(topic, createKey(e), e))
                .doOnNext(r -> log.info("produced record. key: {}", r.key()))  // now I have traceId and span in MDC
                .map(pr -> SenderRecord.create(pr, pr.key()));  //using message key as correlationMetada, but could be traceId

this.template.send(srFlux) // now PropagatingSenderTracingObservationHandler propagates proper traceId
                .doOnNext(r -> log.info("emitted event: {}", r.correlationMetadata())) // this part, that runs after kafka sends the message, does not have MDC propagated
                .subscribe();

The missing part is the doOnNext operator that executes after Kafka sends the message. It runs in a different thread and the MDC is not propagated there by reactor automatically.

The Kafka library provides a correlationMetadata that I could use. By passing the traceId as correlationMetadata in the sender, it would be available in the response. Then I could use it to restore MDC. Any better way of doing this? Why the MDC was not propagated by reactor in this case?

marcioemiranda commented 7 months ago

Hello again,

Everything is working as expected now. I had to do some refactoring and add context write close to subscribe. So, the root cause of this issue was the use of Sink.Many to publish events. Passing the observation as metadata (plus some refactoring in the event consumer) solved the issue.

In case anyone is interested, this is the final version of the event consumer.

public void subscribe() {
        this.flux
                .flatMap(this::sendRecord)
                .subscribe();
    }

    private Flux<SenderResult<String>> sendRecord(DomainEvent<AddTriggerEvent> domainEvent) {
        return Flux.just(domainEvent.getData())
                .map(e -> new ProducerRecord<>(topic, createKey(e), e))
                .doOnNext(r -> log.info("produced record. key: {}", r.key()))
                .map(pr -> SenderRecord.create(pr, pr.key()))
                .flatMap(sr -> this.template.send(sr)
                        .doOnError(e -> log.error("Failed to send kafka message", e))
                        .doOnNext(
                                r -> log.info(
                                        "Successfully sent kafka record. CorrelationMetadata [{}] Partition [{}] Offset[{}]",
                                        r.correlationMetadata(), r.recordMetadata().partition(),
                                        r.recordMetadata().offset())))
                .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY,
                        domainEvent.getObservation()));

    }

Cheers.

chemicL commented 7 months ago

@marcioemiranda thanks for sharing your solution with the community. The approach you arrived at is exactly the way it should be done using the existing tooling. I'm glad you were able to figure it out 👏

Some commentary: The attempt at restoring the ThreadLocals with doOnNext and ObservationThreadLocalAccessor is error prone as it sets the ThreadLocals on some Thread and never cleans up; at the same time continuing the execution on a different Thread would again show lack of those ThreadLocal values. The way it is implemented in Reactor is that the ThreadLocal values are transparently restored and cleaned up as long as the Subscriber at the bottom of the chain has the corresponding Context, precisely the way you implemented and suggested above. Again, thanks for sharing ❤️

marcioemiranda commented 7 months ago

Thanks @chemicL . Actually your posts and YouTube video were very helpful in figuring this up. Great content!

Sharing here in case anyone is interested. Must read/watch stuff.

https://www.youtube.com/watch?v=6OKS36PSpho https://spring.io/blog/2023/03/28/context-propagation-with-project-reactor-1-the-basics