rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

RSocketRequesterTracingObservationHandler producing netty buffer LEAKs #1101

Open kevinat opened 7 months ago

kevinat commented 7 months ago

I'm getting some warnings about memory leaks with Spring boot 3.2, and after searching I found that according to: Netty reports LEAK while using reactor (Rsocket) with Sleuth #2256 there seems to be a bug fix that hasn't migrated.

Here is @OlegDokuka 's fix migrated to RSocketRequesterTracingObservationHandler:

--- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java
+++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java
@@ -23,13 +23,17 @@ import io.micrometer.tracing.Tracer;
 import io.micrometer.tracing.handler.TracingObservationHandler;
 import io.micrometer.tracing.internal.EncodingUtils;
 import io.micrometer.tracing.propagation.Propagator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
 import io.rsocket.Payload;
+import io.rsocket.metadata.CompositeMetadataCodec;
 import io.rsocket.metadata.TracingMetadataCodec;
-import java.util.HashSet;
+import io.rsocket.metadata.WellKnownMimeType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+import java.util.HashSet;
+
 public class RSocketRequesterTracingObservationHandler
     implements TracingObservationHandler<RSocketContext> {
     private static final Logger log =
@@ -115,17 +119,25 @@ public class RSocketRequesterTracingObservationHandler
         long[] spanId = EncodingUtils.fromString(traceContext.spanId());
         long[] parentSpanId = EncodingUtils.fromString(traceContext.parentId());
         boolean isTraceId128Bit = traceIds.length == 2;
+        final ByteBufAllocator allocator = newMetadata.alloc();
         if (isTraceId128Bit) {
-            TracingMetadataCodec.encode128(
-                newMetadata.alloc(),
-                traceIds[0],
-                traceIds[1],
-                spanId[0],
-                EncodingUtils.fromString(traceContext.parentId())[0],
-                flags);
+            CompositeMetadataCodec.encodeAndAddMetadata(newMetadata,
+                allocator, WellKnownMimeType.MESSAGE_RSOCKET_TRACING_ZIPKIN,
+                TracingMetadataCodec.encode128(
+                    allocator,
+                    traceIds[0],
+                    traceIds[1],
+                    spanId[0],
+                    EncodingUtils.fromString(traceContext.parentId())[0],
+                    flags));
         } else {
-            TracingMetadataCodec.encode64(
-                newMetadata.alloc(), traceIds[0], spanId[0], parentSpanId[0], flags);
+            CompositeMetadataCodec.encodeAndAddMetadata(newMetadata,
+                allocator, WellKnownMimeType.MESSAGE_RSOCKET_TRACING_ZIPKIN,
+                TracingMetadataCodec.encode64(
+                    allocator, traceIds[0], spanId[0], parentSpanId[0], flags));
         }
     }
 }