micrometer-metrics / micrometer

An application observability facade for the most popular observability tools. Think SLF4J, but for observability.
https://micrometer.io
Apache License 2.0
4.46k stars 990 forks source link

When using KafkaMessageDrivenChannelAdapter with tracing it fails #5108

Closed hyunbeeds closed 1 month ago

hyunbeeds commented 5 months ago

Describe the bug

when using kafka and KafkaMessageDrivenChannelAdapter with tracing propagation, it fails.. because in micrometer it tries to translate b3 header to String.class..

stacktrace

 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2946)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2887)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2854)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$57(KafkaMessageListenerContainer.java:2772)
    at io.micrometer.observation.Observation.observe(Observation.java:562)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2770)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2622)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2508)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2150)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1505)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1469)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: Incorrect type specified for header 'b3'. Expected [class java.lang.String] but actual type is [class [B]
    at org.springframework.messaging.MessageHeaders.get(MessageHeaders.java:216)
    at org.springframework.integration.support.management.observation.MessageReceiverContext.lambda$new$0(MessageReceiverContext.java:38)
    at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.lambda$onStart$0(PropagatingReceiverTracingObservationHandler.java:59)
    at io.micrometer.tracing.otel.bridge.OtelPropagator$1.get(OtelPropagator.java:73)
    at io.opentelemetry.extension.trace.propagation.B3PropagatorExtractorSingleHeader.extractSpanContextFromSingleHeader(B3PropagatorExtractorSingleHeader.java:36)
    at io.opentelemetry.extension.trace.propagation.B3PropagatorExtractorSingleHeader.extract(B3PropagatorExtractorSingleHeader.java:31)
    at io.opentelemetry.extension.trace.propagation.B3Propagator.lambda$extract$0(B3Propagator.java:117)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:1002)
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
    at io.opentelemetry.extension.trace.propagation.B3Propagator.extract(B3Propagator.java:123)
    at org.springframework.boot.actuate.autoconfigure.tracing.CompositeTextMapPropagator.lambda$extract$2(CompositeTextMapPropagator.java:110)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.AbstractList$RandomAccessSpliterator.tryAdvance(AbstractList.java:706)
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
    at org.springframework.boot.actuate.autoconfigure.tracing.CompositeTextMapPropagator.extract(CompositeTextMapPropagator.java:112)
    at io.micrometer.tracing.otel.bridge.OtelPropagator.extract(OtelPropagator.java:65)
    at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.onStart(PropagatingReceiverTracingObservationHandler.java:58)
    at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.onStart(PropagatingReceiverTracingObservationHandler.java:35)
    at io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onStart(ObservationHandler.java:149)
    at io.micrometer.observation.SimpleObservation.notifyOnObservationStarted(SimpleObservation.java:222)
    at io.micrometer.observation.SimpleObservation.start(SimpleObservation.java:169)
    at io.micrometer.observation.Observation.observe(Observation.java:493)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:469)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2873)
    ... 13 more

kafka headers

{
    "b3": "d8496f9a548268a7e1c7a0fc40f6789c-2d3821b6c980345a-1",
    "spring_json_header_types": "{\"SENDER\":\"java.lang.String\",\"PAYLOAD_TYPE\":\"java.lang.String\"}",
    "SENDER": "payment",
    "PAYLOAD_TYPE": "PartnerSpaceBillingUserEmailUpdatedEvent"
}

if spring_json_header_types contains information of b3 header, it doesn't produces error. (but in the issue https://github.com/spring-cloud/spring-cloud-stream/issues/2905, it seems natural that spring_json_header_types not contains information of b3 header)

{
"b3": "d8496f9a548268a7e1c7a0fc40f6789c-2d3821b6c980345a-1",
"spring_json_header_types": "{\"SENDER\":\"java.lang.String\",\"PAYLOAD_TYPE\":\"java.lang.String\", \"b3\":\"java.lang.String\"}",
"SENDER": "payment",
"PAYLOAD_TYPE": "PartnerSpaceBillingUserEmailUpdatedEvent"
}

Environment

To Reproduce using two spring boot components with observation enabled,

  1. component A produces kafka messages
  2. component B consumes kafka messages
  3. inside component B, it tries to route messages to specific channels with header using KafkaMessageDrivenChannelAdapter
  4. error occurs

properties related with kafka and micrometer tracing

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: 1
      retries: 1
    consumer:
      enable-auto-commit: true
      auto-commit-interval: 1000
      max-poll-records: 15
      auto-offset-reset: latest
      group-id: ***
  integration:
    management:
      observation-patterns: "*"
      default-logging-enabled: true

management:
  tracing:
    sampling:
      probability: 1.0
    propagation:
      type: b3
      consume: b3
      produce: b3
maslailleron commented 5 months ago

I have the same problem.

artembilan commented 5 months ago

This has nothing to do with Micrometer by itself.

@jonatan-ivanov ,

please, consider to close this in favor of: https://github.com/spring-projects/spring-integration/issues/9191.

Thanks

shakuzen commented 1 month ago

Closing per https://github.com/micrometer-metrics/micrometer/issues/5108#issuecomment-2135913066