grpc / grpc-java

The Java gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/java/
Apache License 2.0
11.4k stars 3.83k forks source link

How to check isCanceled when using grpc stremming? #7984

Closed phamtai97 closed 3 years ago

phamtai97 commented 3 years ago

I implemented model back-perssure, but I have the following problem: before I write message to the stream, I check isCanceled() and it returns true. I see the document that "isCanceled" is true when the peer closed the stream. So, I check the client and it does not close the stream.

So, I think that the server or the client is set keepAliveTimeout which is a small value. I increased the value of this config, but the isCanceled() is still true.

How should I handle it to avoid making such a mistake? Thanks all.

voidzcy commented 3 years ago

Can you post some code snippet as well as what's the keepAliveTimeout values you used? For both client and server. Note keepAliveTimeout should be at least multiple times of the RTT.

phamtai97 commented 3 years ago

This is client code:

    ManagedChannel channel =
        VertxChannelBuilder.forTarget(Vertx.vertx(), "localhost:8545")
            .usePlaintext(true)
            .keepAliveTime(60, TimeUnit.SECONDS)
            .keepAliveWithoutCalls(true)
            .build();

    LogServiceGrpc.LogServiceVertxStub stub = LogServiceGrpc.newVertxStub(channel);

This is server code:

    ServerInterceptor wrappedInterceptor = BlockingServerInterceptor.wrap(vertx, serverInterceptor);
    VertxServerBuilder serverBuilder = VertxServerBuilder.forPort(vertx, grpcConfig.getPort());
    serverBuilder
        .nettyBuilder()
        .withChildOption(ChannelOption.SO_SNDBUF, grpcConfig.getTcpSendBufferSizeInKiB() * 1024)
        .withChildOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .permitKeepAliveWithoutCalls(grpcConfig.isPermitKeepAliveWithoutCalls())
        .permitKeepAliveTime(grpcConfig.getPermitKeepAliveTimeMs(), TimeUnit.MILLISECONDS)
        .maxConnectionAge(grpcConfig.getMaxConnectionAgeMs(), TimeUnit.MILLISECONDS)
        .maxConnectionIdle(grpcConfig.getMaxConnectionIdleMs(), TimeUnit.MILLISECONDS)
        .keepAliveTime(grpcConfig.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS)
        .keepAliveTimeout(grpcConfig.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS)
        .maxConcurrentCallsPerConnection(grpcConfig.getMaxConcurrentCallsPerConnection());
ejona86 commented 3 years ago

isCancelled() can be true for many reasons including connections being killed or simple things like deadlines being exceeded. The best way to find out why it is true is to observe the onError() callback.

phamtai97 commented 3 years ago

I see meeting the following error:

Server-side:

WARNING: Stream Error
io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
    at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149)
    at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
    at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
    at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
    at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
    at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams$2.process(DefaultHttp2Connection.java:961)
    at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.decrementPendingIterations(DefaultHttp2Connection.java:1022)
    at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.forEachActiveStream(DefaultHttp2Connection.java:977)
    at io.netty.handler.codec.http2.DefaultHttp2Connection.forEachActiveStream(DefaultHttp2Connection.java:208)
    at io.grpc.netty.NettyServerHandler.forcefulClose(NettyServerHandler.java:663)
    at io.grpc.netty.NettyServerHandler.access$1200(NettyServerHandler.java:99)
    at io.grpc.netty.NettyServerHandler$KeepAlivePinger.onPingTimeout(NettyServerHandler.java:832)
    at io.grpc.internal.KeepAliveManager$1.run(KeepAliveManager.java:63)
    at io.grpc.internal.LogExceptionRunnable.run(LogExceptionRunnable.java:41)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:263)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Client-side:

io.grpc.StatusRuntimeException: CANCELLED: HTTP/2 error code: CANCEL
Received Rst Stream
    at io.grpc.Status.asRuntimeException(Status.java:532)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:434)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:398)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:256)
    at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
    at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:188)
    at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:180)
    at io.vertx.grpc.VertxChannelBuilder.lambda$build$1(VertxChannelBuilder.java:256)
    at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:588)
    at io.grpc.internal.DelayedStream$DelayedStreamListener$5.run(DelayedStream.java:441)
    at io.grpc.internal.DelayedStream$DelayedStreamListener.delayOrExecute(DelayedStream.java:383)
    at io.grpc.internal.DelayedStream$DelayedStreamListener.closed(DelayedStream.java:438)
    at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:39)
    at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:720)
    at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:462)
    at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:224)
    at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:445)
    at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:281)
    at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31)
    at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:229)
    at io.grpc.internal.MessageDeframer.closeWhenComplete(MessageDeframer.java:191)
    at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:183)
    at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:448)
    at io.grpc.netty.NettyClientHandler.onRstStreamRead(NettyClientHandler.java:385)
    at io.grpc.netty.NettyClientHandler.access$1300(NettyClientHandler.java:86)
    at io.grpc.netty.NettyClientHandler$FrameListener.onRstStreamRead(NettyClientHandler.java:803)
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onRstStreamRead(DefaultHttp2ConnectionDecoder.java:350)
    at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onRstStreamRead(Http2InboundFrameLogger.java:80)
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readRstStreamFrame(DefaultHttp2FrameReader.java:516)
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:260)
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:118)
    at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:390)
    at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:450)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
ejona86 commented 3 years ago

@phamtai97, are the client and server talking directly to each other? Or is there an HTTP proxy between them?

phamtai97 commented 3 years ago

Currently, I use the proxy between them. And I use RPC stream (one-many) to send data.

ejona86 commented 3 years ago

@phamtai97, the problem is probably due to the proxy. The warning on server-side only occurs if the RPC is cancelled by a remote, and the client also sees the RPC cancelled by a remote. So the proxy decided to cancel the RPC. That's all I can say.

Which proxy are you using? Does every RPC fail? Maybe the proxy isn't compatible with gRPC.

phamtai97 commented 3 years ago

Oh, thank you so much. I think that the proxy is the root cause.