open-telemetry / opentelemetry-java-instrumentation

OpenTelemetry auto-instrumentation and instrumentation libraries for Java
https://opentelemetry.io
Apache License 2.0
1.79k stars 785 forks source link

Opentelemetry Kafka consumer doesnt inject context in headers #11259

Open kokikathir opened 2 months ago

kokikathir commented 2 months ago

Describe the bug

In Opentracing, In consumer interceptor, consumer span context is injected into the header. https://github.com/opentracing-contrib/java-kafka-client/blob/master/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaUtils.java#L138

But with Opentelemetry, it is not injected. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java#L292

Steps to reproduce

Please check the above code

Expected behavior

Span context to be injected in headers

Actual behavior

Span context not being injected in headers

Javaagent or library instrumentation version

main

Environment

JDK: OS:

Additional context

No response

laurit commented 2 months ago

You could extract the context from the original context propagation header.

Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(
    Context.root(), record, new TextMapGetter<ConsumerRecord<?, ?>>() {
      @Override
      public Iterable<String> keys(ConsumerRecord<?, ?> consumerRecord) {
        return StreamSupport.stream(consumerRecord.headers().spliterator(), false)
            .map(Header::key)
            .collect(Collectors.toList());
      }

      @Nullable
      @Override
      public String get(@Nullable ConsumerRecord<?, ?> consumerRecord, String key) {
        Header header = consumerRecord.headers().lastHeader(key);
        if (header == null) {
          return null;
        }
        byte[] value = header.value();
        if (value == null) {
          return null;
        }
        return new String(value, StandardCharsets.UTF_8);                  }
    });
Span span = Span.fromContext(context);
if (span.getSpanContext().isValid()) {
  System.err.println(span.getSpanContext().getTraceId() + " " + span.getSpanContext().getSpanId());
}

when using java agent you can replace openTelemetry with GlobalOpenTelemetry.get(). Would that work for you?

bnigh commented 2 months ago

I also see issues when trying to migrate from opentracing to opentelemetry for projects that use reactor-kafka.

This issue seems to arise because in reactor-kafka the KafkaReceiver.receive (or [ReactiveKafkaConsumerTemplate](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.html#receive())) returns a Flux (meaning there is a single reactor context for the entire flux - not per record). Additionally with project reactor, code execution may use different schedulers and threads (so the active span from the thread context is not always accurate). This means if I have code like

kafkaReceiver.receive()
    .flatMap(consumerRecord -> {
        // some code that instruments or generates a nested span
    })

Then for the nested span, the span information in the reactor context or the active span (in the thread context) may not be accurately referring to the consumer span

This works for opentracing, because the interceptor writes the consumer span into the headers. But for opentelemetry, the consumer span is not written to the headers.


As an example, with open tracing if one application publishes to a topic, and another application consumes and then has some nested operation (where the nested operation could be a custom application span, a downstream rest service call, a publish to another kafka topic, ...) then I would expect a trace similar to

To_topic (producer span for trace 1)
  └─ From_topic (consumer span for trace 1)
      └─ nested (nested application span for trace 1)

For a similar example for open tracing with 2 traces, I would expect traces similar to

topic send (producer span for trace 1)
  └─ topic process (consumer span for trace 1)
      └─ nested (nested application span for trace 1)

topic send (producer span for trace 2)
 └─ topic process (consumer span for trace 2)
     └─ nested (nested application span for trace 2)

But if I use the active span as the parent of the nested spans, I can end up with spans referring to parent spans for another trace

topic send (producer span for trace 1)
  └─ topic process (consumer span for trace 1)
      └─ nested (nested application span for trace 1)
      └─ nested (nested application span for trace 2)

topic send (producer span for trace 2)
  └─ topic process (consumer span for trace 2)

And if I use the headers to determine the parent of the nested spans, the parent then refers to the producer span (since the consumer never writes to the header)

topic send (producer span for trace 1)
  ├─ topic process (consumer span for trace 1)
  └─ nested (nested application span for trace 1)

topic send (producer span for trace 2)
  ├─ topic process (consumer span for trace 2)
  └─ nested (nested application span for trace 2)

Is there another way to achieve the desired outcome of the following, or is an enhancement needed to opentelemetry?

topic send (producer span for trace 1)
  └─ topic process (consumer span for trace 1)
      └─ nested (nested application span for trace 1)

topic send (producer span for trace 2)
 └─ topic process (consumer span for trace 2)
     └─ nested (nested application span for trace 2)
github-actions[bot] commented 1 month ago

This has been automatically marked as stale because it has been marked as needing author feedback and has not had any activity for 7 days. It will be closed automatically if there is no response from the author within 7 additional days from this comment.

laurit commented 1 month ago

@bnigh thanks now I understand that the ask is to add the consumer span (not the producer) info into the header. If I understand correctly opentracing seems to overwrite the tracing headers that originally had producer span so that they contain the consumer span. I think we probably shouldn't do that, but we could write the tracing info for the consumer span in w3c format to some other header like otel-consumer. Need to consider whether this should also be done in other messaging instrumentations.

trask commented 1 month ago

with project reactor, code execution may use different schedulers and threads (so the active span from the thread context is not always accurate)

the java agent does try to automatically propagate context for project reactor code, can you narrow down more specifically where the context propagation is failing?

bnigh commented 1 month ago

@laurit That is correct. Open tracing is appending the tracing header, but open telemetry does not.

@trask

the java agent does try to automatically propagate context for project reactor code, can you narrow down more specifically where the context propagation is failing?

Please see this code from my previous comment where we use KafkaReceiver from reactor kafka

This issue seems to arise because in reactor-kafka the KafkaReceiver.receive (or [ReactiveKafkaConsumerTemplate](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.html#receive())) returns a Flux (meaning there is a single reactor context for the entire flux - not per record). Additionally with project reactor, code execution may use different schedulers and threads (so the active span from the thread context is not always accurate). This means if I have code like

kafkaReceiver.receive()
    .flatMap(consumerRecord -> {
        // (a) some code that instruments or generates a nested span
    })

I believe a full reproducible example was shared in https://github.com/open-telemetry/opentelemetry-java-instrumentation/discussions/9771