open-telemetry / opentelemetry-java-instrumentation

OpenTelemetry auto-instrumentation and instrumentation libraries for Java
https://opentelemetry.io
Apache License 2.0
1.8k stars 789 forks source link

Multiple trace Id's generated from kafka consumer #2531

Open kurvatch opened 3 years ago

kurvatch commented 3 years ago

Describe the bug We have deployed three services Service A, Service B and Service C. Service A interacts with Service B via API call and drops a message to Kafka Topic which is consumed by Service C. Service C consumes the record and calls external API call and inserts data into database. We observe multiple trace Id's generated for database and external API calls from kafka consumer instead of multiple span Id's.

Services Interaction: https://github.com/sunvuz/OpenTelemetry-poc/blob/30cdb74b4ac53bbba08fbc510e5900b3dc6bdf12/Kafka%20Consumer%20-New%20Traceid%20issue.PNG

Zipkin: Image with multiple trace Id's from consumer https://github.com/sunvuz/OpenTelemetry-poc/blob/30cdb74b4ac53bbba08fbc510e5900b3dc6bdf12/Kafka%20Consumer%20-New%20Traceid%20issue%20-Zipkin.PNG

Steps to reproduce Sample Service's mimicking above behavior has been updated at : https://github.com/sunvuz/OpenTelemetry-poc

What did you expect to see? Span Id for database and external API call Interaction instead of Trace Id's

What did you see instead? New Trace Id's generated for database and external API interaction.

What version are you using? v1.0.0 same is the behavior with 0.x.x versions

Environment JDK 8+ OS : Linux

Additional context Add any other context about the problem here.

previousdeveloper commented 3 years ago

We have also the same problem. We use spring boot Kafka consumer. When consumer listens to batch message every operation has its own trace ids instead of spanId. There is no problem when no batch message consumer. I think it's related to https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1890

sunvuz commented 3 years ago

@previousdeveloper Tried the following options 1) In springboot - application.yml ,setting spring.kafka.listener.type = single 2) Setting org.apache.kafka.clients.consumer.ConsumerConfig , MAX_POLL_RECORDS_CONFIG = 1 Still have the same issue .

Please share your configuration for "no batch message consumer"

dquagebeur commented 3 years ago

Hi, I dont know if it is the same problem but it looks very similar. In our case, we are using kafka-connect.

First step, a connector produces a message, insert it into kafka. The message contains a "traceparent" header (displayed in akhq).

The second connector consumes the message to make external http requests. The first http request contains one traceparent header, the second call contains two traceparent headers (not a second value, but to headers names traceparent with differents values), and so on ...

We have upgraded to version 1.0.0 and the problem is still there.

previousdeveloper commented 3 years ago

@anuraaga hey do you have any idea about this issue?

anuraaga commented 3 years ago

@previousdeveloper I'm not an expert in Kafka so it will take a bit to dig in. But the main thing that comes to mind is currently, I don't think we have instrumentation of reactive streams with kafka client

https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients

which is used in the sample app

https://github.com/sunvuz/OpenTelemetry-poc/blob/master/consumeMicroService/src/main/java/com/sample/awsplayground/consumer/SampleConsumer.java#L114

I wouldn't be surprised if this breaks context propagation into the consumer, which would mean we need to implement support for it.

trask commented 2 years ago

If you are still having this issue, can you try with the latest 1.13.1? there were a couple of related kafka instrumentation fixes that could resolve this issue

m-kay commented 2 years ago

I have tried version 1.13.1 of the javaagent and it still does not work with batch listeners.

Is there a possibility to manually add the links from all the messages in the batch to the auto generated spans?

mateuszrzeszutek commented 2 years ago

Hey @m-kay , Are you using spring-kafka? What version?

m-kay commented 2 years ago

I'm using spring-kafka 2.8.5 and spring-cloud-stream-binder-kafka 3.2.2. Therefore my consumer is defined as function bean and not with a @KafkaListener annotated method. Does that may cause the problem?

mateuszrzeszutek commented 2 years ago

Hmm, that's possible - we're only testing plain spring-kafka usage. Can you prepare a small repro app? That'd really help us pinpoint the issue.

m-kay commented 2 years ago

I have created a demo app with 2 consumers, one is consuming the messages in batch mode and the other one as single messages: https://github.com/m-kay/otel-demo-http-sink

When sending a message to the topic demo-input-topic with a header traceparent set, it is picked-up correctly by the single message consumer but not by the batch consumer.

m-kay commented 2 years ago

I also added a batch listener using the annotation @KafkaListener but that doesn't work either 🤔

m-kay commented 2 years ago

Is there something I could do with manual instrumentation to get around this issue? Maybe add all the trace parents from the batch as links to the current span which is generated from auto instrumentation?

m-kay commented 1 year ago

@mateuszrzeszutek did you already have a chance to look into this?

mateuszrzeszutek commented 1 year ago

Hey @m-kay , Sorry, I haven't had the time to take a look at this.

jbaris commented 1 year ago

Do you have any progress on this? We have also the same issue...

jbaris commented 1 year ago

I think the issue is at instrumentation/kafka/kafka-clients/kafka-clients-0.11. There is an instrumentation called ConsumerRecordsInstrumentation. It has an inner class IteratorAdvice that is applied to org.apache.kafka.clients.consumer.ConsumerRecords#iterator() method. Basically, the advice wraps the result of the iterator() method using a TracingIterator object. That object, has two main overrides:

In our case, we are using Camel as an abstraction of Kafka, but the issue can apply to others libraries/frameworks. Camel processes the ConsumerRecords at KafkaRecordProcessorFacade#processPolledRecords(). The root cause of the issue is hasNext() is called twice:

while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
   ConsumerRecord<Object, Object> record = recordIterator.next();
   lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, kafkaRecordProcessor, record);
   ...
}

So, the process span (and the context) is closed before the expected, and a new traceId is created for the following spans.

UPDATE: I did this ugly change, but works as expected for me https://github.com/jbaris/opentelemetry-java-instrumentation/commit/77fd79785383615734d64366b1b58d4257b8d151