spring-cloud / spring-cloud-sleuth

Distributed tracing for spring cloud
https://spring.io/projects/spring-cloud-sleuth
Apache License 2.0
1.77k stars 781 forks source link

Spring Cloud Sleuth not propagating b3 traces from kafka records via Kafka Stream Binder #1953

Closed richardkabiling closed 8 months ago

richardkabiling commented 3 years ago

Issue Description Logging in spring-cloud-stream kafka-streams-binder does not log traces. It seems b3 traces are not picked up from the record in the topic.

Steps to replicate

  1. Bring up environment
    docker-compose up -d
    ./gradlew clean bootRun

    In a separate console, tail topic. If this step fails due to non-existent topic, please perform step 2 once to trigger topic auto-creation first.

    docker-compose exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic resources --property print.key=true --property print.headers=true --property print.timestamp=true
  2. Emitted the following data into a resources topic (via HTTP -> Producer -> topic)
    curl -X PUT 'localhost:25400/namespaces/a/resources/3' \
    --header 'Content-Type: application/json' \
    --data-raw '{
    "name": "C"
    }'
  3. Confirm in application logs that b3 traces are generated/propagated
    2021-05-20 10:14:45.103  INFO [,bb4e9443b6bcd085,bb4e9443b6bcd085] 7211 --- [ctor-http-nio-3] c.e.p.kafka.sleuth.ResourcesController   : Received request to map resource. namespace=a, resourceId=3, request=MapResourceRequest(name=C)
  4. Confirm record in topic if b3 traces are propagated
    CreateTime:1621476885103        b3:bb4e9443b6bcd085-c06a30a6b67ae651-0,__TypeId__:com.example.poc.kafka.sleuth.Resource a:3     {"namespace":"a","id":"3","name":"C"}
  5. Spring cloud stream kafka binder is able to pick-up record, but not the b3 traces
    2021-05-20 10:14:45.132  INFO [,,] 7211 --- [-StreamThread-1] c.e.poc.kafka.sleuth.ProcessingConfig    : Received to materialize resource. a:3=Resource(namespace=a, id=3, name=C)
    extra["springCloudVersion"] = "2020.0.2"
    ...
    dependencies {
        ...
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    implementation("org.springframework.cloud:spring-cloud-starter-sleuth")
        ...
    }
    spring.cloud.stream.function.definition=materializeResources
    spring.cloud.stream.kafka.streams.binder.functions.materializeResources.application-id=resourceMaterializerProcessor
    spring.cloud.stream.bindings.materializeResources-in-0.destination=resources
    ...
    spring.sleuth.messaging.kafka.enabled=true
    spring.sleuth.messaging.kafka.streams.enabled=true
    @Bean
    fun materializeResources(resourceSerde: JsonSerde<Resource>): Consumer<KStream<String, Resource>> {
        return Consumer {
            it.peek { key, value -> logger.info { "Received to materialize resource. $key=$value" } }
                .toTable(Materialized.`as`<String, Resource, KeyValueStore<Bytes, ByteArray>>("resources-store")
                    .withKeySerde(Serdes.StringSerde())
                    .withValueSerde(resourceSerde))
        }

Sample

  1. Please clone: https://github.com/richardkabiling/kafka-sleuth (for java maven sample: clone https://github.com/richardkabiling/kafka-sleuth-maven-java instead)
  2. Follow instructions above
marcingrzejszczak commented 3 years ago

Hi! Would it be a big ask for you to rewrite this project to Maven and Java? I have constant issues with Gradle and Kotlin with Intellij :cry:

richardkabiling commented 3 years ago

I can probably try regenerating from initializer. Please give me some time to rewrite.

richardkabiling commented 3 years ago

@marcingrzejszczak - rewrote to java and maven. Please try cloning this instead: https://github.com/richardkabiling/kafka-sleuth-maven-java

Also updated issue. For the most part behavior is still the same:

kafka-console-consumer:

CreateTime:1621530585201        b3:bf864af90968acfb-e9a8f91c1303ecb9-0,__TypeId__:com.example.poc.kafka.sleuth.Resource a:e     {"namespace":"a","id":"e","name":"5"}

application logs:

2021-05-21 01:09:45.188  INFO [,bf864af90968acfb,bf864af90968acfb] 25280 --- [or-http-epoll-2] c.e.poc.kafka.sleuth.ResourceController  : Received request to map resource. namespace=a, resourceId=e. request=MapIssuerRequest(name=5)
...
2021-05-21 01:09:45.213  INFO [,,] 25280 --- [-StreamThread-1] c.e.poc.kafka.sleuth.ProcessingConfig    : Received request to materialize resource. key=a:e, value=Resource(namespace=a, id=e, name=5)

trace ID (bf864af90968acfb) appeared in both the Controller logs and the kafka console consumer but not in the logs by the ProcessingConfig.

marcingrzejszczak commented 3 years ago

I think that this can be related https://github.com/spring-cloud/spring-cloud-sleuth/issues/1965

marcingrzejszczak commented 3 years ago

We would need help with documenting the workaround for #1965. Anybody is interested in helping us out?

MisterRnobe commented 2 years ago

Hi guys, I recently faced the same problem and ReactorSleuth.tracedMono wrapper usage works fine for me. All the nested .flatMap calls log provided trace. Here is an example, may be it'll help you:

class KafkaListener(
    private val consumerTemplate: ReactiveKafkaConsumerTemplate<String, String>,
    private val kafkaProcessor: KafkaMessageHandler,
    private val tracer: Tracer,
    private val propagator: Propagator
) : DisposableBean {

    private lateinit var disposable: Disposable

    @PostConstruct
    fun startConsuming() {
        disposable = consumerTemplate
            .receiveAutoAck()
            .doOnNext { log.debug { "Received $it" } }
            .flatMap { record ->

                val spanBuilder = propagator.extract(record, TracingKafkaPropagatorGetter())
                val span = spanBuilder.start()

                ReactorSleuth.tracedMono(tracer, span) {
                    kafkaProcessor.process(record)
                        .onErrorResume { Mono.just(false) }
                }
            }
            .subscribe { log.debug { "Handled, success: $it" } }
    }

    override fun destroy() = disposable.dispose()

}
marcingrzejszczak commented 8 months ago

Please upgrade to Micrometer Tracing. Spring Cloud Sleuth is feature complete and out of OSS support.