open-telemetry / opentelemetry-java-instrumentation

OpenTelemetry auto-instrumentation and instrumentation libraries for Java
https://opentelemetry.io
Apache License 2.0
1.87k stars 819 forks source link

ConcurrentKafkaListenerContainerFactoryPostProcessor overriding interceptor #11793

Open micd opened 1 month ago

micd commented 1 month ago

Describe the bug

Hi, in ConcurrentKafkaListenerContainerFactoryPostProcessor there are two lines

listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());

that are overriding custom interceptors that are already set in container factory. Would it make sense and would it be possible to find any beans that are implementing RecordInterceptor and call SpringKafkaTelemetry#createRecordInterceptor with decorator parameter or using CompositeRecordInterceptor. With this we could have both interceptors registered.

Steps to reproduce

  1. Add opentelemetry to project and enable sdk
  2. Create ConcurrentKafkaListenerContainerFactory with custom interceptor
  3. Run application - custom interceptor will be replaced by InstrumentedRecordInterceptor

Expected behavior

Both custom interceptor and InstrumentedRecordInterceptor should be added to ConcurrentKafkaListenerContainerFactory

Actual behavior

Custom interceptor is replaced by InstrumentedRecordInterceptor

Javaagent or library instrumentation version

1.40.0

Environment

JDK: OS:

Additional context

No response

laurit commented 1 month ago

cc @zeitlinger

zeitlinger commented 1 month ago

there's no getter for the interceptors

but I could give you the inerceptors for you to combine like this

Subject: [PATCH] api diff
---
Index: instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java b/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java
--- a/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java    (revision 746ef018613959ae67fbf21e608d3a879892248c)
+++ b/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java    (date 1721055942218)
@@ -5,23 +5,17 @@

 package io.opentelemetry.instrumentation.spring.autoconfigure.v2.internal.instrumentation.kafka;

-import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
-import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
-import org.springframework.beans.factory.ObjectProvider;
+import java.util.function.Supplier;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

 class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor {

-  private final ObjectProvider<OpenTelemetry> openTelemetryProvider;
-  private final ObjectProvider<ConfigProperties> configPropertiesProvider;
+  private final Supplier<SpringKafkaTelemetry> springKafkaTelemetry;

-  ConcurrentKafkaListenerContainerFactoryPostProcessor(
-      ObjectProvider<OpenTelemetry> openTelemetryProvider,
-      ObjectProvider<ConfigProperties> configPropertiesProvider) {
-    this.openTelemetryProvider = openTelemetryProvider;
-    this.configPropertiesProvider = configPropertiesProvider;
+  ConcurrentKafkaListenerContainerFactoryPostProcessor(Supplier<SpringKafkaTelemetry> springKafkaTelemetry) {
+    this.springKafkaTelemetry = springKafkaTelemetry;
   }

   @Override
@@ -32,13 +26,7 @@

     ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory =
         (ConcurrentKafkaListenerContainerFactory<?, ?>) bean;
-    SpringKafkaTelemetry springKafkaTelemetry =
-        SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
-            .setCaptureExperimentalSpanAttributes(
-                configPropertiesProvider
-                    .getObject()
-                    .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
-            .build();
+    SpringKafkaTelemetry springKafkaTelemetry = this.springKafkaTelemetry.get();
     listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
     listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());

Index: instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java b/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java
--- a/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java   (revision 746ef018613959ae67fbf21e608d3a879892248c)
+++ b/instrumentation/spring/spring-boot-autoconfigure-2/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/v2/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java   (date 1721055942206)
@@ -8,6 +8,7 @@
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
 import io.opentelemetry.instrumentation.spring.autoconfigure.v2.internal.ConditionalOnEnabledInstrumentation;
+import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
 import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@@ -40,6 +41,18 @@
           ObjectProvider<OpenTelemetry> openTelemetryProvider,
           ObjectProvider<ConfigProperties> configPropertiesProvider) {
     return new ConcurrentKafkaListenerContainerFactoryPostProcessor(
-        openTelemetryProvider, configPropertiesProvider);
+        () -> getTelemetry(openTelemetryProvider, configPropertiesProvider)
+    );
+  }
+
+   @Bean
+   static SpringKafkaTelemetry getTelemetry(ObjectProvider<OpenTelemetry> openTelemetryProvider,
+             ObjectProvider<ConfigProperties> configPropertiesProvider) {
+     return SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
+         .setCaptureExperimentalSpanAttributes(
+             configPropertiesProvider
+                 .getObject()
+                 .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
+         .build();
   }
 }

would this work?

micd commented 1 month ago

Thanks @zeitlinger . I think this won't help as there is still this postprocessor that is setting and overriding interceptors. This is the crucial part here.

yeah, I didn't check properly and there is no getter so I don't know if we can build something better with current spring kafka components

Unfortunately, I don't have a good solution to implement in opentelemetry lib. Maybe it's not the right place for the fix. Or maybe you will find some better solution.

Anyway, I appreciate the quick response, thank you. If you will find some other solution that would be great

zeitlinger commented 1 month ago

Thanks @zeitlinger . I think this won't help as there is still this postprocessor that is setting and overriding interceptors. This is the crucial part here.

I think we can handle this - just disable the automatic kafka instrumentation

micd commented 1 month ago

Yep, this would definitely help if we could disable it by a property. The downside, and the reason why I haven't proposed it, is that it could not be easy to understand reasoning and use cases for end users without diving deeper in the code. But maybe it's not a big problem and we can try to do it this way. What do you think?

zeitlinger commented 1 month ago

Yep, this would definitely help if we could disable it by a property. The downside, and the reason why I haven't proposed it, is that it could not be easy to understand reasoning and use cases for end users without diving deeper in the code.

I agree - but I want to stick to the API that is supported by kafka - so we can remain stable. You could ask the spring team to add this functionality.

micd commented 1 month ago

Yeah. That's also an option to change the spring kafka module. I will probably propose this change to them. Anyway, will you be willing to implement the change that you've proposed with the ability to turn it on/off with a property?

laurit commented 1 month ago

Perhaps we should rewrite the spring boot kafka instrumentation in a different way. Besides not working with custom interceptors there is another issue with batch interceptors not working correctly with retries. I think we could use https://docs.spring.io/spring-boot/api/java/org/springframework/boot/autoconfigure/kafka/DefaultKafkaConsumerFactoryCustomizer.html to add a post processor for DefaultKafkaConsumerFactory. This post processor could wrap the returned consumer. I don't know whether there are any differences in telemetry between kafka interceptor and wrapper based instrumentations. If we just wan't to make custom interceptors work we could consider proxying the ConcurrentKafkaListenerContainerFactory so we could intercept calls to setRecordInterceptor and setBatchInterceptor.

zeitlinger commented 1 month ago

If we just wan't to make custom interceptors work we could consider proxying

that won't work with the spring starter

micd commented 3 weeks ago

Thanks @zeitlinger . I think this won't help as there is still this postprocessor that is setting and overriding interceptors. This is the crucial part here.

I think we can handle this - just disable the automatic kafka instrumentation

Hey @laurit @zeitlinger, yeah, spring kafka team would also be for that. From their perspective using postprocessors in this way is not recommended. Would you be able do that change?

zeitlinger commented 2 weeks ago

a PR for this change would be welcome :smile: