open-telemetry / opentelemetry-java-instrumentation

OpenTelemetry auto-instrumentation and instrumentation libraries for Java
https://opentelemetry.io
Apache License 2.0
1.99k stars 868 forks source link

Spring Kafla batch listener keeps using the same traceId after exception is thrown. #10698

Closed tkramarczyk-vgw closed 8 months ago

tkramarczyk-vgw commented 9 months ago

Describe the bug

Once an exception is thrown in KafkaListener, the records are retried, however, every new poll starts using the same traceId.

At the beginning every batch processing is instrumented with a new traceId:

[trace_id=ea183b2234da006e22b90741d03ba550, trace_flags=01, span_id=ddd9e45f221e04f4] 10:56:50.299 INFO c.e.k.KafkaTracingListener1Application: consumed [aaa-aaa] 
[trace_id=5e569f0325ed6e3325e269062cca1ab8, trace_flags=01, span_id=4e69a18c3728f1f4] 10:56:58.228 INFO c.e.k.KafkaTracingListener1Application: consumed [bbb-bbb] 

Then, for the 3rd operation in the given example we throw an exception:

[trace_id=dc73c698e8bb5f03dcecfbf22d586c5d, trace_flags=01, span_id=238f3e5a10892eca] 10:57:01.979 INFO c.e.k.KafkaTracingListener1Application: consumed [ccc-ccc] 
[trace_id=dc73c698e8bb5f03dcecfbf22d586c5d, trace_flags=01, span_id=238f3e5a10892eca] 10:57:01.982 ERROR c.e.k.KafkaTracingListener1Application: Error! 

Then we see that it's being retried with a new traceId:

[trace_id=1f63157677fef738bde3f5dc24190262, trace_flags=01, span_id=16ca9fde6264ce48] 10:57:02.009 INFO c.e.k.KafkaTracingListener1Application: consumed [ccc-ccc] 

However, every new poll and records will keep using the same traceId:

[trace_id=1f63157677fef738bde3f5dc24190262, trace_flags=01, span_id=16ca9fde6264ce48] 10:58:41.943 INFO c.e.k.KafkaTracingListener1Application: consumed [ddd-ddd] 
[trace_id=1f63157677fef738bde3f5dc24190262, trace_flags=01, span_id=16ca9fde6264ce48] 10:58:45.180 INFO c.e.k.KafkaTracingListener1Application: consumed [eee-eee]
[trace_id=1f63157677fef738bde3f5dc24190262, trace_flags=01, span_id=16ca9fde6264ce48] 10:58:49.236 INFO c.e.k.KafkaTracingListener1Application: consumed [fff-fff]

Once an exception happend, there are logs printed from SupportabilityMetrics:

DEBUG io.opentelemetry.javaagent.shaded.instrumentation.api.internal.SupportabilityMetrics - Suppressed Spans by 'io.opentelemetry.spring-kafka-2.7' (CONSUMER) : 1

Same behaviour is not observed when using single listener.

Steps to reproduce

Here's a simple one-file application:

private val logger = KotlinLogging.logger {}

@SpringBootApplication
class KafkaTracingListener1Application

fun main(args: Array<String>) {
    runApplication<KafkaTracingListener1Application>(*args)
}

@Component
class BatchListener {

    var counter = 0

    @KafkaListener(topics = ["myTopic"])
    fun consume(events: List<ConsumerRecord<String, String>>) {
        logger.info { "consumed ${events.map { it.key() + "-" + it.value() }}" }
        counter++
        if (counter == 3) {
            logger.error { "Error!" }
            throw RuntimeException("sth went wrong")
        }
    }
}

And its configuration:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    listener:
      type: batch
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      security:
        protocol: PLAINTEXT

Expected behavior

Every batch of records gets a new traceid

Actual behavior

After an exception is thrown, traceId is not reset.

Javaagent or library instrumentation version

2.1.0

Environment

Java agent: Manifest-Version: 1.0 Implementation-Title: javaagent Implementation-Version: 2.1.0 ...

Spring: id("org.springframework.boot") version "3.2.3" id("io.spring.dependency-management") version "1.1.4"

Run on: JDK 17

Additional context

No response

laurit commented 9 months ago

@tkramarczyk-vgw could you provide the full sample application as a github repo or zip

tkramarczyk-vgw commented 9 months ago

@laurit here's the repo: https://github.com/tkramarczyk-vgw/kafka-batch-listener-otel-bug