reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
615 stars 229 forks source link

GH-321 / PR 325 - No trace observed in neither log, neither trace aggregation system #370

Closed patpatpat123 closed 1 year ago

patpatpat123 commented 1 year ago

Hello team,

First of all, I wanted to thank you for the PR GH-321 / PR 325. I am observing an issue and would like to start this report, hoping you will get some time to take a look at this.

Expected Behavior

Using the latest version of reactor kafka <version>1.3.22-SNAPSHOT</version>, I would expect to see traces in my logs (code below) and aggregation systems (Zipkin and co)

Actual Behavior

However, it is not present in neither of them

Steps to Reproduce

Here are the parts of the code:

Some parts of the pom

  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.5</version>
        <relativePath/>
    </parent>

 <dependencies>
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
            <version>1.3.22-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core-micrometer</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-elastic</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-otel</artifactId>
        </dependency>
        <dependency>
            <groupId>io.opentelemetry</groupId>
            <artifactId>opentelemetry-exporter-otlp</artifactId>
        </dependency>
    </dependencies>

Parts of the Consumer configuration

@Configuration
public class ConsumerConfiguration {

 @Bean
    public KafkaReceiver<String, String> kafkaReceiver(MeterRegistry registry, ObservationRegistry observationRegistry) {
        final Map<String, Object> properties = new HashMap<>();
//        properties.put(SSL_PROTOCOL, SSL_VALUE);
//        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation);
//        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassphrase);
//        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
//        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassphrase);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "myConsumerGroup");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(properties);
        return KafkaReceiver.create(receiverOptions
                .addAssignListener(p -> LOGGER.info("    partitions assigned {}", p))
                .addRevokeListener(p -> LOGGER.info("    partitions revoked {}", p))
                .consumerListener(new MicrometerConsumerListener(registry)) // This is for metrics, it was there from past PR
                .withObservation(observationRegistry) // This is for tracing, the interesting part
                .subscription(Collections.singleton("theTopic")));
    }
@Configuration
public class TracingConfiguration {
    @Bean
    public SpanExporter otlpGrpcSpanExporter() {
        return OtlpGrpcSpanExporter.builder()
                .setEndpoint("my-zipkin.com")
                .build();
    }

I have a service that will do the consuming part, just writing it down:

@Service
public final class MyService implements CommandLineRunner {
    @Override
    public void run(final String... args) {
        transformService.transform(
                extractService.extract())
                .subscribe();
    }

The consuming part:

    public Flux<ConsumerRecord<String, String>> extract() {
        return kafkaReceiver
                .receive()
                .flatMap(message -> {
                    Observation receiverObservation =
                            KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
                                    KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
                                    () ->
                                            new KafkaRecordReceiverContext(
                                                    message, "user.receiver", "myKafkaServer"),
                                    observationRegistry);
                    return Mono.just(message)
//                            .flatMap(TARGET_RECORD_HANDLER) I do not understand what is that sorry
                            .doOnTerminate(receiverObservation::stop)
                            .doOnError(receiverObservation::error)
                            .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
                });
    }
    public Flux<String> transform(final Flux<ConsumerRecord<String, String>> consumerRecordFlux) {
        return consumerRecordFlux
                .flatMap(consumerRecord -> {
                    LOGGER.info("<----- Would be great to see the traces here! But nothing " + consumerRecord.value().toLowerCase());
                    // here will be some method I would like to have metrics on
                    return Mono.just(consumerRecord.value().toLowerCase());
                })
                .tap(Micrometer.observation(observationRegistry));
    }

Here is the log configuration:

logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss} ${PID} %-5level --- [%thread] [${spring.application.name:},%X{traceId:-},%X{spanId:-}] %logger{36} : %msg%n

And unfortunately, here is a log lone, without trace:

2023-11-02 07:59:45 13356 INFO  --- [reactive-kafka-myConsumerGroup-1] [myApp,,] transformService : "<----- Would be great to see the traces here! But nothing " {"someKey":"someValue"}

There is also nothing in Zipkin or any other trace aggregation systems. I would have expected to see traces in Zipkin, and also, in the log something like [myApp,73b62c0f90d11e06,73b62c0f90d11e06]

Possible Solution

I do not know, could you please help?

Your Environment

SpringBoot 3.1.5, Reactor Kafka 1.3.22-SNAPSHOT, graalVM java 21

janchristian-haddorp commented 1 year ago

@patpatpat123, following my tests it should work this way... Your actual code goes inside the observation wrapper.

@EventListener(ApplicationStartedEvent.class)
public Disposable consume() {
    return kafkaReceiver
            .receive()
            .flatMap(message -> {
                Observation receiverObservation =
                        KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
                                KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
                                () ->
                                        new KafkaRecordReceiverContext(
                                                message, "user.receiver", "myKafkaServer"),
                                observationRegistry);
                return Mono.just(message)
                        .flatMap(service::transform) // your actual consumer code
                        .doOnTerminate(receiverObservation::stop)
                        .doOnError(receiverObservation::error)
                        .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
            })
            .subscribe();
}

So, your transform method works on a ConsumerRecord directly:

public Flux<String> transform(final ConsumerRecord<String, String> consumerRecordFlux) {
    LOGGER.info("<----- Would be great to see the traces here! But nothing " + consumerRecord.value().toLowerCase());
    // here will be some method I would like to have metrics on
    return Mono.just(consumerRecord.value().toLowerCase());   
}

Never used Kafka with CommandLineRunner. Maybe there are some adjustments needed...

janchristian-haddorp commented 1 year ago

Regarding logging try using hanlde method. There are some tricky aspects regarding context propagation in reactive context...

.handle((consumerRecord, sink) -> {
    LOGGER.info("<----- Would be great to see the traces here! But nothing " + consumerRecord.value().toLowerCase());
    sink.next(consumerRecord);
})
patpatpat123 commented 1 year ago

Hello @janchristian-haddorp , just wanted to first thank you for your time answering this.

I took your advice and came up with this:

  public Flux<String> extract() {
        return kafkaReceiver
                .receive()
                .flatMap(message -> {
                    Observation receiverObservation =
                            KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
                                    KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
                                    () ->
                                            new KafkaRecordReceiverContext(
                                                    message, "user.receiver", "myKafkaServer"),
                                    observationRegistry);
                    return Mono.just(message)
                            .flatMap(consumerRecord -> {
                                LOGGER.info("<----- Would be great to see the traces here! But nothing " + consumerRecord.value().toLowerCase());
                                return Mono.just(consumerRecord.value().toLowerCase());
                            }) 
                            .doOnTerminate(receiverObservation::stop)
                            .doOnError(receiverObservation::error)
                            .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
                });
    }

Unfortunately, the technical issues itself still remains, I am not able to see traces in the logs, and not able to see traces in zipkin.

May I ask if I understood your solution incorrectly?

Would you have a minimal solution where traces can be seen in logs and trace aggregation systems handy? I can maybe work my way up from there.

Good day!

janchristian-haddorp commented 1 year ago

@patpatpat123, I prepared some example code. https://github.com/janchristian-haddorp/opentelemetry/tree/reactor-kafka

If you want to connect traces from publisher, make sure traceparentheader` is set... image

Did you set this prop? management.tracing.sampling.probability=1.0

patpatpat123 commented 1 year ago

Hello @janchristian-haddorp ,

Thank you for spending time on this sample, highly appreciated. I took the code out of the box, and it is working fine, very clear example.

Unfortunately, I am still not able to see traces in logs.

I took your code, out of the box, configured the application.properties with my respective values, and just added this one line:

logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss} ${PID} %-5level --- [%thread] [${spring.application.name:},%X{traceId:-},%X{spanId:-}] %logger{36} : %msg%n

Please note the presence of [${spring.application.name:},%X{traceId:-},%X{spanId:-}]

Do you mind also doing the same?

And the output of the log after adding this is:

2023-11-06 09:57:05 34684 INFO  --- [reactive-kafka-myconsumer-1] [demo-application,,] c.e.o.ReactiveKafkaConsumer : Processing Kafka record, ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = null, offset = 1012421465, CreateTime = 1699235820000, serialized key size = -1, serialized value size = 948, headers = {}, key = null, value = {"somekey":"somevalue"})
2023-11-06 09:57:05 34684 INFO  --- [reactive-kafka-myconsumer-1] [demo-application,,] c.e.o.ReactiveKafkaConsumer : Received record: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = null, offset = 1012421465, CreateTime = 1699235820000, serialized key size = -1, serialized value size = 948, headers = {}, key = null, value = {"somekey":"somevalue"})

Therefore, it seems the method handleReceiverRecord with LOGGER.info("Received record: {}", printReceiverRecord(receiverRecord)); and logEventProcessing with receiverRecord -> LOGGER.info("Processing Kafka record, {}", printReceiverRecord(receiverRecord)); are not getting any traces.

Could you please help clarify this or let me know what did I do wrong please?

Again, thank you for your help.

artembilan commented 1 year ago

Sorry for delay.

Here is a sample about Spring Boot and Reactor Kafka tracing: https://github.com/artembilan/sandbox/tree/master/spring-boot-reactor-kafka-tracing.

I really see that only Mono.handle() operator correlates tracing in logs:

2023-11-08T12:43:26.086-05:00  WARN [reactor-kafka-tracing,654bc8bd850d9a794f0ccac3d691f68f,57433f420d5b2a18] 24652 --- [r-kafka-group-1] gBootReactorKafkaTracingApplicationTests : ConsumerRecord(topic = test-topic, partition = 1, leaderEpoch = null, offset = 0, CreateTime = 1699465405742, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [RecordHeader(key = traceparent, value = [48, 48, 45, 54, 53, 52, 98, 99, 56, 98, 100, 56, 53, 48, 100, 57, 97, 55, 57, 52, 102, 48, 99, 99, 97, 99, 51, 100, 54, 57, 49, 102, 54, 56, 102, 45, 50, 101, 52, 101, 53, 53, 51, 97, 56, 97, 49, 102, 56, 57, 56, 54, 45, 48, 49])], isReadOnly = false), key = null, value = test data)

Probably newer version of Reactor would do better for other operators. The

        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>context-propagation</artifactId>
            <version>1.0.6</version>
        </dependency>

does not make any difference.

And here is a screenshot of my Zipkin: image

I had do add this dependency to make the reporter working:

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
        </dependency>

I think this Reactor Kafka library should be fixed in regards for BOOTSTRAP_SERVERS_CONFIG property which has to be a List according to Kafka client. Although the functionality in the question works as advertised.

Closing as Work as Designed

patpatpat123 commented 11 months ago

Hello @artembilan ,

No problem at all for the delay. If not anything else, thank you again for the PR and your sample.

We had a chance to download it, and it is working fine.

However, I am very puzzled by the spring-web dependency.

You said in above:

I had do add this dependency to make the reporter working:

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
        </dependency>

And indeed, when removing this dependency will yield traces, but they will not be sent to zipkin.

When this dependency is in, we do see ```2023-11-16T20:46:46.863+08:00 DEBUG [reactor-kafka-tracing,,] 4280 --- [reactor-kafka-tracing] [ender@452fc2eb}] [ ] o.s.web.client.RestTemplate : HTTP POST http://localhost:9411/api/v2/spans


I am having a very hard time understanding the need to be dependent on this.

Our current dependency graph looks as follows, a clean reactor kafka application:
org.springframework.boot spring-boot-starter-parent 3.2.0-RC2
<properties>
    <java.version>21</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>io.projectreactor.kafka</groupId>
        <artifactId>reactor-kafka</artifactId>
        <version>1.3.22</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-tracing-bridge-brave</artifactId>
    </dependency>
    <dependency>
        <groupId>io.zipkin.reporter2</groupId>
        <artifactId>zipkin-reporter-brave</artifactId>
    </dependency>


And having to add this spring web is just very very very awkward.

Could you please let me know why a need of this dependency?

Please let me know if you believe the question should be addressed to the zipkin team, or the micrometer team, or the spring web team.

Thank you
artembilan commented 11 months ago

Sure, @patpatpat123 !

Will help you to understand this since I bumped myself to the same problem originally.

So, Zipkin is hosted somewhere and it comes with a REST API to report our traces. We cannot perform REST API without some HTTP client. And that is indeed an expectation from the classpath in the ZipkinAutoConfiguration. In particular this one:

    @Import({ UrlConnectionSenderConfiguration.class, WebClientSenderConfiguration.class,
            RestTemplateSenderConfiguration.class })
    static class SenderConfiguration {

Their names should say for themselves: or URL connection wrapper in Zipkin, or Spring Web's RestTemplate, or WebClient from Spring WebFlux.

I didn't choose spring-webflux since it comes with the server side as well, I have no idea what is zipkin2.reporter.urlconnection.URLConnectionSender and what jar does it belong, so I chose spring-web since it comes only with HTTP client stuff. The regular REST server side is in the spring-webmvc. Perhaps that's what have confused you, but you should keep in mind that Zipkin reporter is not going to work without some HTTP client in your classpath.

janchristian-haddorp commented 11 months ago

@patpatpat123, @artembilan, sounds logical. Normally I add the spring-boot-starter-webflux depency. Escpacially to avoid including any traditional Web MVC libs.

artembilan commented 11 months ago

@janchristian-haddorp ,

Still you need to keep in mind that of our use-case we need just an HTTP client part. I'm not sure how that is sufficient to bring the whole WebFlux server if your microservice is not intended to be an HTTP server per se.