grpc-ecosystem / grpc-spring

Spring Boot starter module for gRPC framework.
https://grpc-ecosystem.github.io/grpc-spring/
Apache License 2.0
3.49k stars 819 forks source link

Failed to release a message: UnpooledSlicedByteBuf(freed) #1109

Closed cdgjzzy closed 4 months ago

cdgjzzy commented 4 months ago

The bug

When I use gRPC streaming responses, I handle it like this:

observers.forEach((context, observer) -> {
    if (!context.isCancelled() && observer.isReady()) {
        observer.onNext(data);
    }
});

This method gets triggered about 1500 times per second. Under such concurrent circumstances, there are issues with gRPC failing to send messages.

Stacktrace and logs

2024-05-13 09:44:10.412 [grpc-nio-worker-ELG-1-7] WARN  io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.safeRelease - Failed to release a message: UnpooledSlicedByteBuf(freed)
io.grpc.netty.shaded.io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.grpc.netty.shaded.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
    at io.grpc.netty.shaded.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
    at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
    at io.grpc.netty.shaded.io.netty.buffer.CompositeByteBuf$Component.free(CompositeByteBuf.java:1959)
    at io.grpc.netty.shaded.io.netty.buffer.CompositeByteBuf.deallocate(CompositeByteBuf.java:2264)
    at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
    at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
    at io.grpc.netty.shaded.io.netty.buffer.AbstractDerivedByteBuf.release0(AbstractDerivedByteBuf.java:98)
    at io.grpc.netty.shaded.io.netty.buffer.AbstractDerivedByteBuf.release(AbstractDerivedByteBuf.java:94)
    at io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
    at io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:116)
    at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:280)
    at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:361)
    at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:438)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
    at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
    at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:143)
    at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:35)
    at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:47)
    at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:173)
    at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
    at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

Client error:
On error,listening end
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
    at io.grpc.Status.asRuntimeException(Status.java:533)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Invalid protobuf byte sequence
    at io.grpc.Status.asRuntimeException(Status.java:525)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:240)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:134)
    at io.grpc.MethodDescriptor.parseResponse(MethodDescriptor.java:284)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
    ... 5 common frames omitted
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
    at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:115)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readStringRequireUtf8(CodedInputStream.java:822)
    at com.google.protobuf.StringValue$Builder.mergeFrom(StringValue.java:386)
    at com.google.protobuf.StringValue$1.parsePartialFrom(StringValue.java:517)
    at com.google.protobuf.StringValue$1.parsePartialFrom(StringValue.java:509)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:86)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parseFrom(ProtoLiteUtils.java:245)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:237)
    ... 9 common frames omitted

The application's environment

Additional context

If I control the sending frequency with a queue, then this exception does not occur.

ST-DDT commented 4 months ago

Currently the only idea I have why this might happen is a buffer reuse.

Other than that, this does appear to be an issue with grpc-spring but grpc java itself. Please report this upstream and maybe also iclude the protobuf type definition for the affected type. Please link the upstream issue here in case anybody encounters a similar issue in the future.

cdgjzzy commented 4 months ago

The gRPC-Java solution is to upgrade the version; they have indicated that the issue has been fixed in the new version. grpc-java #11201

ST-DDT commented 4 months ago

Thanks for the update