spring-cloud / spring-cloud-sleuth

Distributed tracing for spring cloud
https://spring.io/projects/spring-cloud-sleuth
Apache License 2.0
1.76k stars 782 forks source link

Add support for @KafkaListener annotations #1001

Closed danielsouza85 closed 6 years ago

danielsouza85 commented 6 years ago

Hello, I´m new with kafka and sleuth. And I´m having some problems on tracing a producer and a consumer over kafka.

I have one microservice that put some messages into a kafka topic, and the same microservice has a consumer, listening the same topic.

The first part is working just fine, but when the listener consume the message, the log shows no traceID, spanID. Everithing is empty.

I´ve enabled the debug for sleuth and at the producer part, I can see lots of debug messages, but at the consumer part, the only thing that was show is the log, without any trace information.

Here is the consumer code:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {
    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @Value("${app.topic.topic02}")
    private String topic;

    @KafkaListener(topics = "${app.topic.topic02}")
    public void listen(@Payload String message,@Headers MessageHeaders messageHeaders) {
        //System.out.println("received message = " + message + " to topic = " + topic);
        LOG.info("received message = " + message + " to topic = " + topic);
    }

}

And the Logs with debug enabled:

----> Producer Part

2018-06-05 17:13:22.561 DEBUG [apollo11-referencemicroservice,,,] 15046 --- [tp2067180044-14] o.s.c.sleuth.instrument.web.TraceFilter  : Received a request to uri [/sendMessageToKafka] that should not be sampled [false]
2018-06-05 17:13:22.562 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.sleuth.instrument.web.TraceFilter  : No parent span present - creating a new span
2018-06-05 17:13:22.564 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.i.web.TraceHandlerInterceptor    : Handling span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.564 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.i.web.TraceHandlerInterceptor    : Adding a method tag with value [sendMessageToKafka] to a span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.564 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.i.web.TraceHandlerInterceptor    : Adding a class tag with value [Application] to a span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.567 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] ConcurrencyStrategy$HystrixTraceCallable : Continuing span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.568  INFO [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] b.com.b3.apollo11.kafka.producer.Sender  : Trying to send message to Kafka:
{"prazo":"LONGO","objetivo":"APTO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"}
2018-06-05 17:13:22.568  INFO [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] b.com.b3.apollo11.kafka.producer.Sender  : sending message = {"prazo":"LONGO","objetivo":"APTO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"} to topic = operationA
2018-06-05 17:13:22.569 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [-KafkaService-2] ConcurrencyStrategy$HystrixTraceCallable : Detaching span since it was continued [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true]
2018-06-05 17:13:22.571 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.sleuth.instrument.web.TraceFilter  : Closing the span [Trace: 3071bc3b73453c71, Span: 3071bc3b73453c71, Parent: null, exportable:true] since the response was successful
2018-06-05 17:13:22.572 DEBUG [apollo11-referencemicroservice,3071bc3b73453c71,3071bc3b73453c71,true] 15046 --- [tp2067180044-14] o.s.c.s.zipkin2.DefaultEndpointLocator   : Span will contain serviceName [apollo11-referencemicroservice]

----> Consumer Part

2018-06-05 17:13:22.575  INFO **[apollo11-referencemicroservice,,,]** 15046 --- [ntainer#0-0-C-1] b.c.b3.apollo11.kafka.consumer.Receiver  : received message = {"prazo":"LONGO","objetivo":"APTO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"} to topic = operationA
marcingrzejszczak commented 6 years ago

I don't recall supporting @KafkaListener. If you check out what Brave offers - https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients it works fine when you manually create the listening component. If you use Spring Integration or Spring Cloud Stream, we will automatically continue the trace. I will leave this issue as an enhancement for future releases of Sleuth to add automated support for @KafkaListener

danielsouza85 commented 6 years ago

So, I have to implement my own consumer using Brave, that´s right? As I sad, I´m new with kafka end spring. Do you have a for dummyes example on how to do that?

marcingrzejszczak commented 6 years ago

The easiest approach is to use Spring Cloud Stream https://cloud.spring.io/spring-cloud-stream/

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void handle(Person person) {
        System.out.println("Received: " + person);
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String toString() {
            return this.name;
        }
    }
}

add the spring-cloud-stream-kafka-binder dependency, point it to the proper destination and that's it. There's plenty of samples here https://github.com/spring-cloud/spring-cloud-stream-samples and in the internet.

danielsouza85 commented 6 years ago

Thank you for the help. Do you have a simple example on how to use the KafkaTracing for a producer and consumer? I´m searshing for something on the internet, but didn´t find anything that I can understand.

marcingrzejszczak commented 6 years ago

That's the readme of brave https://github.com/openzipkin/brave/blob/master/instrumentation/kafka-clients/README.md and we instrument it here https://github.com/spring-cloud/spring-cloud-sleuth/blob/master/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceMessagingAutoConfiguration.java

danielsouza85 commented 6 years ago

One more question. At this part:

kafkaTracing = KafkaTracing.newBuilder(tracing) .remoteServiceName("my-broker") .build();

What is this tracing variable? I´m current using the sleuth to control the tracing ID and Span ID. I have to implement My own trace or is there a way to get the trace that is being used?

marcingrzejszczak commented 6 years ago

We're already instrumenting that for you. You don't have to create this bean.

We describe in the documentation how we use Brave (https://cloud.spring.io/spring-cloud-static/Finchley.RC2/single/spring-cloud.html#_introduction_to_brave). You can read about what Tracing is in the Brave's javadocs https://github.com/openzipkin/brave/blob/master/brave/src/main/java/brave/Tracing.java#L27 . It's the core bean that contains references to all the other tracing components.

My own trace or is there a way to get the trace that is being used?

Please read the documentation. If you check the latest docs and you search for what you've asked you will find this section https://cloud.spring.io/spring-cloud-sleuth/single/spring-cloud-sleuth.html#_current_span

danielsouza85 commented 6 years ago

For example:

I have a code like this:

KafkaProducer<String,String> kafkaProducer;
kafkaProducer.send(new ProducerRecord <>(topic, partitions[0], null, message))

If I want tos end trace information through the kafka do a consumer, Is this what I have to do?

KafkaTracing kafkaTracing = KafkaTracing.newBuilder(tracing).remoteServiceName("my-broker").build();
kafkaTracing.producer(kafkaProducer).send(new ProducerRecord <>(topic, partitions[0], null, message));

If that´s correct, how can I get the current traceing?

marcingrzejszczak commented 6 years ago

No, that's completely incorrect.

I wrote to you

We're already instrumenting that for you. You don't have to create this bean.

And you have created the object yourself. Just use the kafkaProducer normally and that's it.

If that´s correct, how can I get the current traceing?

I don't understand this question. If you want the current tracer just autowire it. I think we describe this in the documentation? Can you please read it?

danielsouza85 commented 6 years ago

Sorry for asking so many questions, but I´m stucked at this for some time, and I´m still in the dark.

I read that these traces can only be propagated through kafka versions >0.11.0. Is that true? I´m using version 0.10.1.0. This could be a problem?

marcingrzejszczak commented 6 years ago

I have no idea what you're doing, but it might be somewhat related to https://github.com/spring-cloud/spring-cloud-sleuth/issues/1005 . With Finchley, Spring Cloud Stream Kafka binder uses 0.11 and automatically propagates the header. From what I know in Kafka 0.10 there was no notion of headers as such cc @artembilan

artembilan commented 6 years ago

That's correct. You need to upgrade to Apache Kafka >= 0.11. That's from where it has started to support headers. Spring Kafka will propagate them all for you then automatically. Therefore these tracing headers will go from the producer over Kafka to consumer transparently for you.

I'm afraid we can do nothing for you in that old Kafka version.

See more info in the Spring Kafka Reference Manual: https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/_reference.html#headers

danielsouza85 commented 6 years ago

@artembilan and @marcingrzejszczak, thanks for the informations.

So, I´ve my kafka updated to version 2.12-1.1.0. Now, it´s ready for headers.

As told by @marcingrzejszczak, I´ve changed my aplication to, I´ve rewrite the producer and the consumer, but I still can´t see the traceID beeng propagated through kafka.

At my pom, I have the following:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.12.RELEASE</version>
</parent>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Edgware.SR3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-zipkin</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.3.5.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

The application.properties is:

#Configurações do microserviço
server.port=8080
spring.application.name = apollo11-referencemicroservice

#Configurações do serviço de persistência
spring.service.persistence.host=cldnp00604d.internalenv.corp
spring.service.persistence.port=8586

#Configurações da integração com o MongoDB
spring.data.mongodb.host=cldnp00604d.internalenv.corp
spring.data.mongodb.port=27017
spring.data.mongodb.database=contract
spring.data.mongodb.username=apollo11
spring.data.mongodb.password=apollo11pwd

#Configurações da integração com o Kafka
app.topic.topic01=zipkin
app.topic.topic02=operationA
app.consumer.partitions=0
app.sender.partitions=0

#Configurações para o Sleuth + Zipkin com kafka
spring.sleuth.sampler.percentage=1.0
spring.zipkin.sender.type=kafka
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.kafka.binder.brokers=HOST:9092
spring.cloud.stream.kafka.binder.zkNodes=HOST:2181
spring.cloud.stream.kafka.binder.headers=spanTraceId,spanId,spanParrentSpanId,spanProcessId

spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.output.destination=operationA
spring.cloud.stream.bindings.input.destination=operationA

The consumer code is like bellow:

@EnableBinding(Source.class)
@RestController
@SpringBootApplication
public class Application {

    private static final Logger LOG = LoggerFactory.getLogger(Application.class);

    @Bean
    public RestTemplate rest(RestTemplateBuilder builder) {
        return builder.build();
    }

    @RequestMapping(value="/sendKafkaComTrace", method = RequestMethod.POST)
    public void sendKafkaComTrace(@RequestBody String message){
        LOG.info("-------------------Entrando no sendKafkaComTrace");
        //send message to channel
        Message<String> msg = MessageBuilder.withPayload(message).setHeader("TESTEHEADER", "TESTE").build();
    LOG.info("---------- Messagem a Enviar = {}" + msg.toString());

        mysource.output().send(msg);

    }

    public static void main(String[] args) {
        SpringApplication.run (Application.class, args);
    }
}

And the consumer:

@EnableBinding(Sink.class)
@SpringBootApplication
public class Receiver {
    @Value("${app.topic.topic02}")
    private String topic;
    @Value("${app.consumer.partitions}")
    private String[] partitions;

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    public static void main(String[] args) {
        SpringApplication.run(Receiver.class, args);
      }

      @StreamListener(target=Sink.INPUT)
      public void logfast(GenericMessage msg) {
          LOG.info("++++++++++ Header:" + msg.getHeaders().toString());
          LOG.info("++++++++++ Received message = {}" + msg.toString());
        System.out.println(msg);
      }
}

Executing the code, I can see the following logs:

Producer:

2018-06-19 13:35:15.160  INFO [apollo11-referencemicroservice,5072403c0b9773cc,5072403c0b9773cc,true] 13436 --- [tp1830745997-17] br.com.b3.apollo11.Application           : -------------------Entrando no sendKafkaComTrace
2018-06-19 13:35:15.160  INFO [apollo11-referencemicroservice,5072403c0b9773cc,5072403c0b9773cc,true] 13436 --- [tp1830745997-17] br.com.b3.apollo11.Application           : ---------- Messagem a Enviar = {}GenericMessage [payload={"prazo":"LONGO","objetivo":"ESTUDO","perfilInvestidor":"SABER_VALOR_FINAL","formaRemuneracao":"FINAL_INVESTIMENRO","uuid":"123e4567-e89b-12d3-a456-426655440000"}, headers={TESTEHEADER=TESTE, id=1e3ae519-591f-35f2-00ed-aa0509b72d2d, timestamp=1529426115160}]

Consumer:

2018-06-19 13:35:15.170  INFO [apollo11-referencemicroservice,d5e9832e7ffaab3e,d5e9832e7ffaab3e,true] 13436 --- [ consumer-0-C-1] b.c.b.a.p.m.kafka.consumer.Receiver      : ++++++++++ Header:{kafka_timestampType=CREATE_TIME, messageSent=true, kafka_receivedMessageKey=null, kafka_receivedTopic=operationA, spanName=message:input, spanTraceId=d5e9832e7ffaab3e, spanId=d5e9832e7ffaab3e, nativeHeaders={spanTraceId=[d5e9832e7ffaab3e], spanId=[d5e9832e7ffaab3e], spanName=[message:input], spanSampled=[1]}, kafka_offset=196, id=d95da8cb-9fba-40fe-5088-049e5c766795, kafka_receivedPartitionId=0, spanSampled=1, kafka_receivedTimestamp=1529426115163, contentType=application/json}
2018-06-19 13:35:15.170  INFO [apollo11-referencemicroservice,d5e9832e7ffaab3e,d5e9832e7ffaab3e,true] 13436 --- [ consumer-0-C-1] b.c.b.a.p.m.kafka.consumer.Receiver      : ++++++++++ Received message = {}GenericMessage [payload=byte[162], headers={kafka_timestampType=CREATE_TIME, messageSent=true, kafka_receivedMessageKey=null, kafka_receivedTopic=operationA, spanName=message:input, spanTraceId=d5e9832e7ffaab3e, spanId=d5e9832e7ffaab3e, nativeHeaders={spanTraceId=[d5e9832e7ffaab3e], spanId=[d5e9832e7ffaab3e], spanName=[message:input], spanSampled=[1]}, kafka_offset=196, id=d95da8cb-9fba-40fe-5088-049e5c766795, kafka_receivedPartitionId=0, spanSampled=1, kafka_receivedTimestamp=1529426115163, contentType=application/json}]
GenericMessage [payload=byte[162], headers={kafka_timestampType=CREATE_TIME, messageSent=true, kafka_receivedMessageKey=null, kafka_receivedTopic=operationA, spanName=message:input, spanTraceId=d5e9832e7ffaab3e, spanId=d5e9832e7ffaab3e, nativeHeaders={spanTraceId=[d5e9832e7ffaab3e], spanId=[d5e9832e7ffaab3e], spanName=[message:input], spanSampled=[1]}, kafka_offset=196, id=d95da8cb-9fba-40fe-5088-049e5c766795, kafka_receivedPartitionId=0, spanSampled=1, kafka_receivedTimestamp=1529426115163, contentType=application/json}]

So, we can see that the producer and consumer TraceIDs are different. And at the consumer, I can´t see nothing at the headers, even the custom header i´ve inserted manually at the producer. Is there any problems with the spring-kafka, spring-boot and spring-cloud versions I´m using?

artembilan commented 6 years ago

Not sure what is your 2.12-1.1.0, but it doesn't look like you override anything about Kafka in your POM. Here is a procedure how to do that: https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/deps-for-11x.html

Pay attention that we rely there on the kafka_2.11. there is nothing about 2.12.

Also you need to be sure that your Kafka Broker is exactly newer version, but not < 0.11. There is not enough to upgrade dependencies, if the target broker is not in the proper version.

Also see Compatibility Matrix: https://spring.io/projects/spring-kafka#overview. The Spring Kafka 1.3.x is not compatible with Apache Kafka 1.1.x.

I know this is tough, but there is no choice with Kafka if you would like more features.

marcingrzejszczak commented 6 years ago

I'm sorry but this is a very simple feature that has been working since day one. You have to simplify your example and post it on github after following the guidlines from @artembilan . E.g. your snippets don't even compile (where did you take mySource from?). We won't be able to help you otherwise.

fsopjani commented 6 years ago

@danielsouza85 Remove your sensitive info in your application.properties, you've got host, passwords everything on the configuration properties exposed to public unnecessary.

timtebeek commented 6 years ago

Hi! Seems this issue went off-course with unrelated issues for a moment; coming back on topic I would also like to use plain @KafkaListener annotations whilst propagating trace/span data from my producers. This issue seems to indicate that's not (yet) possible, but in my sample project here the tests seem to indicate it works: https://github.com/timtebeek/sleuth-kafkalistener

Now I'm puzzled; can anyone either confirm this works, or point out the flaw in my example project? If there's documentation to be updated that would also be helpful, although I wouldn't really know what to add or where. Any help appreciated. :)

marcingrzejszczak commented 6 years ago

It just works https://github.com/spring-cloud/spring-cloud-sleuth/pull/1088