grpc / grpc-java

The Java gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/java/
Apache License 2.0
11.39k stars 3.83k forks source link

Add best-effort detection of queuing to ThreadlessExecutor after draining complete #3537

Open jhspaybar opened 7 years ago

jhspaybar commented 7 years ago

Please answer these questions before submitting your issue.

What version of gRPC are you using?

1.3.0

What JVM are you using (java -version)?

java version "1.8.0_144" Java(TM) SE Runtime Environment (build 1.8.0_144-b01) Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)

What did you do?

If possible, provide a recipe for reproducing the error.

During use of a specific client, netty begins to detect memory leaks, and my app eventually run out of direct memory. My app eventually begins to receive OutOfDirectMemory errors which occur much later than the leaks are detected and manifest mostly as an immediate UNKNOWN status from the blocking stub. The stack trace of what that looks like is below.

at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:227)
        at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:208)
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:141)
...company specific stack trace removed
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 520093998, max: 536870912)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:624)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:578)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:718)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:707)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:239)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:287)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:170)
        at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:131)
        at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:73)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more

I'm using multiple gRPC and Netty clients, and even after leaks are detected and even after I see the stack trace above begin to be logged, it appears that these netty and grpc clients will continue to operate normally as long as their connections are maintained. Upon attempting to create new connections, they fail, and the tcp estab errors are capable of being detected at the os layer and show up in metrics dashboards.

Here is the report from the leak detector:

ERROR : [grpc-default-worker-ELG-1-84] [] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
WARNING: 47 leak records were discarded because the leak record count is limited to 4. Use system property io.netty.leakDetection.maxRecords to increase the limit.
Recent access records: 4
#4:
        io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:396)
        io.netty.handler.codec.http2.Http2CodecUtil.readUnsignedInt(Http2CodecUtil.java:204)
        io.netty.handler.codec.http2.DefaultHttp2FrameReader.processHeaderState(DefaultHttp2FrameReader.java:192)
        io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:148)
        io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        io.grpc.netty.FixedHttp2ConnectionDecoder.decodeFrame(FixedHttp2ConnectionDecoder.java:119)
        io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:341)
        io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:401)
        io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
        io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)
#3:
        io.netty.buffer.AdvancedLeakAwareByteBuf.readUnsignedByte(AdvancedLeakAwareByteBuf.java:402)
        io.netty.handler.codec.http2.DefaultHttp2FrameReader.processHeaderState(DefaultHttp2FrameReader.java:191)
        io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:148)
        io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        io.grpc.netty.FixedHttp2ConnectionDecoder.decodeFrame(FixedHttp2ConnectionDecoder.java:119)
        io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:341)
        io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:401)
        io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
        io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)
#2:
        io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:396)
        io.netty.handler.codec.http2.DefaultHttp2FrameReader.processHeaderState(DefaultHttp2FrameReader.java:190)
        io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:148)
        io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        io.grpc.netty.FixedHttp2ConnectionDecoder.decodeFrame(FixedHttp2ConnectionDecoder.java:119)
        io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:341)
        io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:401)
        io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
        io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)
Created at:
        io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229)
        io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:170)
        io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:131)
        io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:73)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
        io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)

Most interesting to me is io.grpc.netty.FixedHttp2ConnectionDecoder.decodeFrame(FixedHttp2ConnectionDecoder.java:119) in the netty report above, it looks like this method declares it can throw and maybe is and something gets into a weird spot?

The server implementation looks like approximately this:

       try {
            responseObserver.onNext(response);
        } catch (IllegalArgumentException e) {
            responseObserver.onError(Status.INVALID_ARGUMENT.withCause(e).asException());
        } catch (Exception e) {
            responseObserver.onError(Status.UNKNOWN.withCause(e).asException());
        } finally {
            responseObserver.onCompleted();
        }

I'm wondering if maybe the onCompleted() calls coming in behind an onError() call might race and cause an issue in the header frames getting flushed and then decoded? I plan to move the onCompleted() up into the try block, and give this another attempt, but I'm pretty stumped as to what's happening here and would like some advice.

Thank you in advance.

jhspaybar commented 7 years ago

If it wasn't super clear in the description above, only one client out of 10+ in the application behave this way. If I send no traffic to that single client, the app runs fine forever. At this point the only thing I can see obviously weird from "normal" with that client is the server impl shown above, but it's possible this is a red herring.

jhspaybar commented 7 years ago

WARNING: 68 leak records were discarded because the leak record count is limited to 4. Use system property io.netty.leakDetection.maxRecords to increase the limit.
Recent access records: 5
#5:
        io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:498)
        io.grpc.netty.NettyReadableBuffer.readBytes(NettyReadableBuffer.java:75)
        io.grpc.internal.CompositeReadableBuffer$3.readInternal(CompositeReadableBuffer.java:110)
        io.grpc.internal.CompositeReadableBuffer$ReadOperation.read(CompositeReadableBuffer.java:231)
        io.grpc.internal.CompositeReadableBuffer.execute(CompositeReadableBuffer.java:189)
        io.grpc.internal.CompositeReadableBuffer.readBytes(CompositeReadableBuffer.java:106)
        io.grpc.internal.ReadableBuffers$BufferInputStream.read(ReadableBuffers.java:342)
        io.grpc.protobuf.lite.ProtoLiteUtils$2.parse(ProtoLiteUtils.java:148)
        io.grpc.protobuf.lite.ProtoLiteUtils$2.parse(ProtoLiteUtils.java:94)
        io.grpc.MethodDescriptor.parseResponse(MethodDescriptor.java:271)
        io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessageRead.runInContext(ClientCallImpl.java:489)
        io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
        io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:117)
        io.grpc.stub.ClientCalls$ThreadlessExecutor.waitAndDrain(ClientCalls.java:587)
        io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:135)```
Also came across one of these in the netty reports.
carl-mastrangelo commented 7 years ago

Can you give it a shot on 1.6?

jhspaybar commented 7 years ago

It'll be tough to try it out in the environment it lives in right now, we're mostly stuck on 1.3 until shaded-netty gets released in a future gRPC release, there's too much stuff internally that is expecting exactly 4.1.8.Final at the moment. I'll see if I can recreate the situation with a more minimal test case and then upgrade it.

jhspaybar commented 7 years ago

We fixed the weirdness in the server handler and it had no impact. Still attempting to reproduce in a more minimal environment at the moment.

jhspaybar commented 7 years ago

@carl-mastrangelo quick question for you. We have retry and hedged request interceptors, and it appears that there may be an interaction here that leads to this leak possibly through corruption. The behavior I'm concerned about is that both the retry and hedge interceptor hold a reference to "next/Channel" object, so they can later call next.newCall(...) again. I have a theory this may not actually be threadsafe and that we sometimes end up calling these at about the same time leading to corruption. Does this theory make sense?

carl-mastrangelo commented 7 years ago

interceptors should be thread safe (they act similarly to channels, which must be threadsafe). Without seeing your interceptor I cannot say for sure. I dont think anything is reused per channel. One thing though: I think the InputStream used for marshalling could be reused up until 1.6, but we auto close it in 1.7 If you are holding a ref to the input stream that might break.

elandau commented 6 years ago

We've finally been able to figure out that this was the result of the interaction between our hedged request interceptor and the ThreadlessExecutor used by the blocking call (this leak doesn't happen for the async stub). Basically, once the first response comes in all runnables for the other calls are no longer processed since the blocking stub exits the processing loop on the ThreadlessExecutor.

Can you offer any guidance on how to continue processing these runnables? A quick fix would be to use a cached thread pool executor on the CallOptions when hedge requests are enabled but that would defeat the benefit of having the calling thread process the runnables in the blocking stub.

ejona86 commented 6 years ago

Wow. That's... an interesting case. Nice job tracking that down. I created #3557 for the ThreadlessExecutor to help notice detecting this case earlier.

You could use a cached thread pool, although you still need to run Runnables on the ThreadlessExecutor to wake up the blocked thread. That seems hacky and does lose the benefit of the ThreadlessExecutor.

Alternative "fix": make your own Executor that delegates to the ThreadlessExecutor, until the interceptor propagates onClose() at which point it swaps execute()s to delegate to a cached thread pool. Although you may not care, note this does make use of the implementation detail that waitAndDrain() fully-drains the queue instead of checking responseFuture.isDone() after every Runnable; that's because it should ideally do the swap atomically after onClose() is queued, which is infeasible. You could however wrap your executor in a SerializingExecutor to guarantee that there are no execute()s between queuing and execution of the onClose().

I think the problem is inherent to the hedging, and not the ThreadlessExecutor. Any call site could provide an Executor and shut it down after onClose(). So it seems like hedging inherently needs its own thread at times.

elandau commented 6 years ago

Definitely an inherent problem to hedging but also like the idea of a adding a sanity check to the ThreadlessExecutor so similar leaks may be detected in the future.

Thanks for the alternative 'fix'. I'll take a shot a creating my own Executor that keeps track of when onClose was called.