micrometer-metrics / micrometer-samples

Sample apps to demo Micrometer features
Apache License 2.0
65 stars 23 forks source link

reactive stream consumer : reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null #23

Open patpatpat123 opened 1 year ago

patpatpat123 commented 1 year ago

Hello team,

Just wanted to start an issue reporting an exception observed for the sample reactive stream consumer.

Just to avoid confusion, there is a kafka producer/consumer, there is a stream producer/consumer, there is a reactive stream producer/consumer. The issue is observed with the later.

I just took the code as it is, and just modified the grade file to a pom. Please correct me if I am wrong, but I do not believe to have forgotten something while migrating to the pom.

package com.example.micrometer;

import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;

import java.util.function.Consumer;

@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);

    public static void main(String... args) {
        new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        log.warn("Remember about calling <.subscribe()> at the end of your Consumer<Flux> bean!");
        log.warn("Remember about finishing the span manually before calling subscribe!");
    }

    @Bean
    Consumer<Flux<Message<String>>> channel(Tracer tracer, ObservationRegistry observationRegistry) {
        return flux -> flux.doOnNext(msg -> log.info("<ACCEPTANCE_TEST> <TRACE:{}> Hello from consumer",
                tracer.currentSpan().context().traceId())).subscribe();
    }

}

I changed the topic

spring:
  application:
    name: stream-reactive-consumer
  cloud:
    stream:
      bindings:
        channel-in-0.destination: myinput

# For tests
logging.pattern.level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>streamreactiveconsumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.1</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2022.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-reporter-brave</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

When running the reactive streaming consumer app, the app starts and run fine.

However, upon consuming a message, this error happens:

2023-01-29T13:03:13.315+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Found no committed offset for partition myinput-0
2023-01-29T13:03:13.322+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Resetting offset for partition myinput-0 to position FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}.
2023-01-29T13:03:13.328+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4: partitions assigned: [myinput-0]
2023-01-29T13:05:18.316+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'stream-reactive-consumer.channel-in-0' has 0 subscriber(s).
2023-01-29T13:05:18.317+08:00 ERROR [stream-reactive-consumer,,] 33308 --- [container-0-C-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null
Caused by: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null
    at com.example.micrometer.StreamReactiveConsumerApplication.lambda$channel$0(StreamReactiveConsumerApplication.java:35) ~[classes/:na]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.5.1.jar:3.5.1]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.5.1.jar:3.5.1]
    at reactor.core.publisher.SinkManyUnicastNoBackpressure.tryEmitNext(SinkManyUnicastNoBackpressure.java:120) ~[reactor-core-3.5.1.jar:3.5.1]
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.5.1.jar:3.5.1]
    at org.springframework.integration.util.IntegrationReactiveUtils.lambda$adaptSubscribableChannelToPublisher$8(IntegrationReactiveUtils.java:141) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.3.jar:6.0.3]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.3.jar:6.0.3]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.3.jar:6.0.3]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.3.jar:6.0.3]
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.10.2.jar:1.10.2]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.1.jar:6.0.1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-2.0.0.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-2.0.0.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2859) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2839) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2762) ~[spring-kafka-3.0.1.jar:3.0.1]
    at io.micrometer.observation.Observation.observe(Observation.java:559) ~[micrometer-observation-1.10.2.jar:1.10.2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2760) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2612) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2498) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2144) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1499) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1463) ~[spring-kafka-3.0.1.jar:3.0.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-3.0.1.jar:3.0.1]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-01-29T13:12:10.480+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Node -1 disconnected.
2023-01-29T13:12:11.485+08:00  INFO [stream-reactive-consumer,,] 33308 --- [pool-4-thread-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-3, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Node -1 disconnected.

This error is observed on the consumer, for message sent using the reactive steam producer, or the normal kafka producer. May I ask what did I miss please?

Thank you

cmergenthaler commented 1 year ago

Any updates on that? I have the exact same issue

cmergenthaler commented 1 year ago

FYI: I had to enable tracing for spring-kafka in order to fix the error

birbirsbirbirs commented 8 months ago

same error. I am trying to use in ChannelInterceptor. import io.micrometer.tracing.Tracer; String traceId = tracer.currentSpan().context().traceId(); String spanId = tracer.currentSpan().context().spanId();