Open Lincong opened 1 year ago
This behavior makes adding both on-responseHeaders and on-responseTrailers callbacks not an option for me because the on-responseHeaders will be invoked too late so that the circuit breaker fails to short-circuit the next client stub invocation
If trailers-only is sent due to an error on the server side, the response stream is immediately closed. Hence, onResponseHeaders()
will be called consecutively and the decision of the rule would be reported to the circuit breaker. Although we need to use two callbacks to properly extract gRPC status from trailers, I don't see the failure report using trailers-only is delayed.
Make Armeria be aware of such "trailers-only" response.
It is a nice suggestion. It is tricky for users to extract grpc-status
from trailers. They have to know how gRPC wire format. Like onStatus(...)
callback, we can create GrpcCircuitBreakerRule
and add a onGrpcStatus()
to it.
Created #4496 for GrpcCircuitBreakerRule
. 😄
Thanks @ikhoon for the response.
I replied here.
Although we need to use two callbacks to properly extract gRPC status from trailers, I don't see the failure report using trailers-only is delayed.
This is not what I observed in my unit test.
My observation shows that if a CircuitBreakerRule
has callbacks from onResponseHeaders
and onResponseTrailers
registered, when the server throws an exception (which, according to you, immediately closes the response stream and then immediately calls onResponseHeaders
), neither callbacks get called until much later (and sometimes even after the next stub invocation starts). I can double check my observation.
IIRC according to my code reading, when a CircuitBreakerRule
has callbacks from onResponseHeaders
and onResponseTrailers
registered, this rule's flag requiresResponseTrailers
becomes true
. When requiresResponseTrailers == true
, none of this rule's callbacks get called until either there is a HTTP trailer or some other conditions (next invocation starts? some timeout?). I can double check my understanding of the code as well.
Thanks!
@ikhoon if you can point me to the code showing the below behavior, that'd be great!
If trailers-only is sent due to an error on the server side, the response stream is immediately closed. Hence, onResponseHeaders() will be called consecutively
This is not what I observed in my unit test.
Let me also check it locally. If it is not working as I said, it would be a bug.
@ikhoon if you can point me to the code showing the below behavior, that'd be great!
If you debug the following lines with breakpoints, we can check that a failed response is completed as the trailers-only is received. https://github.com/line/armeria/blob/876339437d566cd74e0f87a090430441b3da1f4e/core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java#L215-L217 https://github.com/line/armeria/blob/daae15d426ffb2581fdff9f10f16ec69f90b7a36/core/src/main/java/com/linecorp/armeria/common/logging/DefaultRequestLog.java#L412 https://github.com/line/armeria/blob/a9592f49c501beff47bdb02788d6e79009c51ffc/core/src/main/java/com/linecorp/armeria/client/circuitbreaker/CircuitBreakerClient.java#L309
My observation shows that if a CircuitBreakerRule has callbacks from onResponseHeaders and onResponseTrailers registered, when the server throws an exception (which, according to you, immediately closes the response stream and then immediately calls onResponseHeaders), neither callbacks get called until much later (and sometimes even after the next stub invocation starts). I can double check my observation.
This is actually what I didn't get it. 😅 Could you share us a small reproducer, please?
To chime in a little bit, here is what I observed. When response contains error with trailer-only "header" happens, I can see from the stack trace that gRPC related callback was trigger from this line. The onResponse trailer callback (which is callback added as log().whenAvailable(RESPONSE_TRAILERS)
eventually) is invoked one line after from here, and that's create a time window where user see the response (or exception from the response) first while onTrailer callback isn't called yet.
Thanks @ikhoon and @minwoox!
I did more digging with my test setup and here is what I found:
a failed response is completed as the trailers-only is received
did not happen in my test because Netty did NOT receive endOfStream
immediately after the trailers-only response was received and this is the cause for the delayed callback invocation.
Server side:
An exception with gRPC status ErrorCode.PERMISSION_DENIED
is thrown immediately when a request is received in the application logic.
Client side: Circuit breaker is set up in 2 ways:
With only onResponseHeaders
:
val rule = CircuitBreakerRule.of(
CircuitBreakerRule
.builder()
.onResponseHeaders(
(_, respHeaders) => {
println("+++ onResponseHeaders called at " + System.currentTimeMillis() + " with: " + respHeaders)
println("+++ Stack trace: " + Thread.currentThread().getStackTrace().mkString("\n"))
// ...
false
}
)
.thenFailure()
)
With both onResponseHeaders
and onResponseTrailers
:
val rule = CircuitBreakerRule.of(
// 1st rule
CircuitBreakerRule
.builder()
.onResponseHeaders(
(_, respHeaders) => {
println("+++ onResponseHeaders called at " + System.currentTimeMillis() + " with: " + respHeaders)
println("+++ Stack trace: " + Thread.currentThread().getStackTrace().mkString("\n"))
// ...
false
}
)
.thenFailure(),
// 2nd rule
CircuitBreakerRule
.builder()
.onResponseTrailers(
(ctx, trailers) => {
println("### onResponseTrailers called with: " + trailers)
false
}
)
.thenFailure()
)
I add print statement before and after my gRPC client stub invocation like this:
for (attempt <- 0 until 20) {
println("--- before calling foo(): attempt: " + attempt)
val ex = intercept[StatusRuntimeException](
clientStub.foo(...) // Expect server to reject with PERMISSION_DENIED
)
println("--- after calling foo. Got exception: " + ex + " at " + System.currentTimeMillis())
}
onResponseHeaders
Company content are removed and replaced with #####
.
Note that +++ onResponseHeaders called
happens after --- before calling foo(): attempt: 0
and before --- after calling foo...
. This is as expected.
--- before calling foo(): attempt: 0
+++ onResponseHeaders called at 1666375550988 with: [EOS, :status=200, content-type=application/grpc+proto, grpc-status=7, grpc-message=PERMISSION_DENIED: permission_denied attempt 0, #####, date=Fri, 21 Oct 2022 18:05:50 GMT, content-length=0]
+++ Stack trace: java.lang.Thread.getStackTrace(Thread.java:1559)
#####.$anonfun$decorateWithCircuitBreaker$1(#####)
grpc_shaded.com.linecorp.armeria.internal.client.AbstractRuleBuilderUtil.lambda$buildFilter$0(AbstractRuleBuilderUtil.java:67)
grpc_shaded.com.linecorp.armeria.client.circuitbreaker.CircuitBreakerRuleBuilder$1.shouldReportAsSuccess(CircuitBreakerRuleBuilder.java:99)
grpc_shaded.com.linecorp.armeria.client.circuitbreaker.CircuitBreakerClient.lambda$reportResult$4(CircuitBreakerClient.java:310)
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog$RequestLogFuture.completeLog(DefaultRequestLog.java:1662)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.completeSatisfiedFutures(DefaultRequestLog.java:424)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.updateFlags(DefaultRequestLog.java:412)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.updateFlags(DefaultRequestLog.java:394)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.responseHeaders(DefaultRequestLog.java:1247)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.handleWaitNonInformational(HttpResponseDecoder.java:266)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.tryWrite(HttpResponseDecoder.java:229)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.tryWrite(HttpResponseDecoder.java:157)
grpc_shaded.com.linecorp.armeria.common.stream.StreamWriter.write(StreamWriter.java:70)
grpc_shaded.com.linecorp.armeria.client.Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:209)
grpc_shaded.com.linecorp.armeria.client.Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:225)
grpc_shaded.io.netty.handler.codec.http2.Http2FrameListenerDecorator.onHeadersRead(Http2FrameListenerDecorator.java:48)
grpc_shaded.io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onHeadersRead(Http2EmptyDataFrameListener.java:63)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:409)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:337)
grpc_shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:56)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:476)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:484)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
grpc_shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
grpc_shaded.io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
grpc_shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
grpc_shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
grpc_shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
grpc_shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
grpc_shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
grpc_shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
grpc_shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
grpc_shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
grpc_shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
grpc_shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
grpc_shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
grpc_shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
--- after calling foo. Got exception: grpc_shaded.io.grpc.StatusRuntimeException: PERMISSION_DENIED: PERMISSION_DENIED: permission_denied attempt 0 at 1666375551015
--- before calling foo(): attempt: 1
onResponseHeaders
and onResponseTrailers
As you can see, +++ onResponseHeaders called
happened after the first call (after calling foo...
) and even after the start of the second call --- before calling foo(): attempt: 1
.
--- before calling foo(): attempt: 0
--- after calling foo. Got exception: grpc_shaded.io.grpc.StatusRuntimeException: PERMISSION_DENIED: PERMISSION_DENIED: permission_denied attempt 0 at 1666376176325
--- before calling foo(): attempt: 1
+++ onResponseHeaders called at 1666376176338 with: [EOS, :status=200, content-type=application/grpc+proto, grpc-status=7, grpc-message=PERMISSION_DENIED: permission_denied attempt 0, ######, date=Fri, 21 Oct 2022 18:16:16 GMT, content-length=0]
+++ Stack trace: java.lang.Thread.getStackTrace(Thread.java:1559)
##########.$anonfun$decorateWithCircuitBreaker$1(##########)
grpc_shaded.com.linecorp.armeria.internal.client.AbstractRuleBuilderUtil.lambda$buildFilter$0(AbstractRuleBuilderUtil.java:67)
grpc_shaded.com.linecorp.armeria.client.circuitbreaker.CircuitBreakerRuleBuilder$1.shouldReportAsSuccess(CircuitBreakerRuleBuilder.java:99)
grpc_shaded.com.linecorp.armeria.client.circuitbreaker.CircuitBreakerRuleUtil$3.shouldReportAsSuccess(CircuitBreakerRuleUtil.java:83)
grpc_shaded.com.linecorp.armeria.client.circuitbreaker.CircuitBreakerClient.lambda$reportResult$4(CircuitBreakerClient.java:310)
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog$RequestLogFuture.completeLog(DefaultRequestLog.java:1662)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.completeSatisfiedFutures(DefaultRequestLog.java:424)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.updateFlags(DefaultRequestLog.java:412)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.endResponse0(DefaultRequestLog.java:1377)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.endResponse0(DefaultRequestLog.java:1338)
grpc_shaded.com.linecorp.armeria.common.logging.DefaultRequestLog.endResponse(DefaultRequestLog.java:1319)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.cancelAction(HttpResponseDecoder.java:349)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.cancelTimeoutOrLog(HttpResponseDecoder.java:368)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.close(HttpResponseDecoder.java:318)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.onSubscriptionCancelled(HttpResponseDecoder.java:303)
grpc_shaded.com.linecorp.armeria.client.Http2ResponseDecoder.onWrapperCompleted(Http2ResponseDecoder.java:96)
grpc_shaded.com.linecorp.armeria.client.Http2ResponseDecoder.lambda$addResponse$1(Http2ResponseDecoder.java:85)
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
grpc_shaded.com.linecorp.armeria.common.stream.AbstractStreamMessage$CloseEvent.notifySubscriber(AbstractStreamMessage.java:259)
grpc_shaded.com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriberOfCloseEvent0(DefaultStreamMessage.java:300)
grpc_shaded.com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriberOfCloseEvent(DefaultStreamMessage.java:292)
grpc_shaded.com.linecorp.armeria.common.stream.DefaultStreamMessage.handleCloseEvent(DefaultStreamMessage.java:429)
grpc_shaded.com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriber0(DefaultStreamMessage.java:372)
grpc_shaded.com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriber(DefaultStreamMessage.java:328)
grpc_shaded.com.linecorp.armeria.common.stream.DefaultStreamMessage.addObjectOrEvent(DefaultStreamMessage.java:314)
grpc_shaded.com.linecorp.armeria.common.stream.DefaultStreamMessage.close(DefaultStreamMessage.java:436)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.closeAction(HttpResponseDecoder.java:335)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.cancelTimeoutOrLog(HttpResponseDecoder.java:370)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.close(HttpResponseDecoder.java:318)
grpc_shaded.com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.close(HttpResponseDecoder.java:308)
grpc_shaded.com.linecorp.armeria.client.Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:216)
grpc_shaded.com.linecorp.armeria.client.Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:225)
grpc_shaded.io.netty.handler.codec.http2.Http2FrameListenerDecorator.onHeadersRead(Http2FrameListenerDecorator.java:48)
grpc_shaded.io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onHeadersRead(Http2EmptyDataFrameListener.java:63)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:409)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:337)
grpc_shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:56)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:476)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:484)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
grpc_shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
grpc_shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
grpc_shaded.io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
grpc_shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
grpc_shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
grpc_shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
grpc_shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
grpc_shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
grpc_shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
grpc_shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
grpc_shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
grpc_shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
grpc_shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
grpc_shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
grpc_shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
grpc_shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
grpc_shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
grpc_shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
grpc_shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
--- after calling foo. Got exception: grpc_shaded.io.grpc.StatusRuntimeException: PERMISSION_DENIED: PERMISSION_DENIED: permission_denied attempt 1 at 1666376176356
--- before calling foo(): attempt: 2
// ...
The bottom parts in both stack traces are the same until Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:225
.
The difference began when the callback was reached from Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:209)
(here) in the expected case whereas the callback was reached from Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:216)
(here) in the delayed case where both onResponseHeaders
and onResponseTrailers
are used.
I looked into the Netty code mentioned in the stack trace and learned that endOfStream
boolean flag was set by Netty (according to the HTTP/2 RFC as shown below).
END_STREAM (0x1): When set, bit 0 indicates that this frame is the last that the endpoint will send for the identified stream.
I think it means 2 things:
onResponseHeaders
and onResponseTrailers
are used, callbacks are invoked when Netty sees END_STREAM
in an HTTP frame and Netty did NOT see END_STREAM HTTP frame
until after the 2nd invocation attempt started (as shown in the stack trace).END_STREAM HTTP frame
. Actually, gRPC protocol specification does not promise that. This is why I claimed that a failed response is completed as the trailers-only is received
did NOT happen in my test. However, the trailers-only response contained an EOS
as shown below (and also in the stack trace above):+++ onResponseHeaders called at 1666376176338 with: [EOS, :status=200, content-type=...
That means gRPC returns a trailers-only response containing EOS
. However, Netty does not interpret it as an HTTP/2-level endOfStream
and the actual endOfStream
are received (and then callbacks are invoked) much later even after the next stub invocation started.
Thanks for reading through this long comment and please let me know what you think!
After reading more of HTTP/2 RFC, I may have improved my understanding of the issue.
HTTP2 data are received in a sequence of frames. When Netty receives enough frames to create headers
, the onHeaders
callback is invoked. However, at this point, Netty has not received the END_STREAM
frame. When Netty receives END_STREAM
frame, res.close()
is called as shown below and then both onHeaders
and onTrailers
callbacks are called.
In my test case, there is a race condition. After Netty receives all headers, Armeria detects the gRPC exception an throws it to the stub invocation level. However, at this point, Netty still has not received the HTTP/2 END_STREAM frame
. This is why circuit-break callbacks were invoked after a stub-invocation-level exception was thrown.
I think the fix should be in Armeria:
When both onHeaders
and onTrailers
callbacks are added to the CircuitBreakerRule
, do NOT wait until HTTP/2 END_STREAM frame
is received and then invoke callbacks. Instead, invoke the onHeaders
callback immediately after enough frames of headers are received (as this is more truthful because the callback is named onHeaders
instead of onHttp2EndStream
:)).
Let me know what you think. Thanks!
Print statement output with both onResponseHeaders and onResponseTrailers
As you can see, +++ onResponseHeaders called happened after the first call (after calling foo...) and even after the start of the second call --- before calling foo(): attempt: 1.
We don't guarantee onResponseTrailers()
should be called before foo()
returns or raises an exception. Because CircuitBreakerClient
asynchronously checks the responses and reports the CircuitBreakerDecision
to the CircuitBreaker
.
onResponseHeaders()
is registered, the onResponseHeaders()
may be called before the foo()
is returned. Because it only doesn't need to wait for a response to be closed.onResponseHeaders()
and onResponseTrailers()
are registered, it is possible to call the callbacks after the call raises an error. Because the call may be finished immediately by here and the callbacks would be called later by here. Trailers-only response itself does NOT contain END_STREAM HTTP frame.
It is possible. But I guess you used AbstractUnaryGrpcService
. If so, END_STREAM
is always sent with trailers-only response. https://github.com/line/armeria/blob/2343b6ebb04ef8d8da9fb5e25663a11f626a3939/core/src/main/java/com/linecorp/armeria/server/AbstractHttpResponseHandler.java#L148
Context
In my setup, Armeria + unary gRPC are used. On the client side, I decorate the
ClientBuilder
withCircuitBreaker
andCircuitBreakerRule
in order to set up circuit breaking based on gRPC status code (e.g. when gRPC status code != OK, count the request as a failure).Initially I passed a callback to
CircuitBreakerRule.builder().onResponseTrailers(...)
and expect the callback to be invoked when HTTP-trailers are received so that I can extract gRPC status code from the HTTP-trailers. However, I realized that when the server side throws an exception, server side does not send HTTP trailers (according to gRPC protocol specification). Instead, gRPC protocol specification names the response "trailers-only" which is physically represented as HTTP headers (and there is no body nor HTTP-trailers). Therefore in this case, the callback passed toCircuitBreakerRule.builder().onResponseTrailers(...)
does not get invoked until after the exception is thrown to the client stub invocation level. When the callback is invoked, the trailers is empty (which is expected because gRPC's "trailers-only" response does not have HTTP-trailers).Temporary workaround
I changed
CircuitBreakerRule.builder().onResponseTrailers(...)
toCircuitBreakerRule.builder().onResponseHeaders(...)
and try to extract gRPC status from the response HTTP headers. It works for the case where the server throws an exception and the response contains only HTTP headers ("trailers-only" in gRPC's terminology) with gRPC status code.Issues
The temporary workaround only works for the circuit breaker to detect failures in trailers-only response. However, the circuit breaker is completely blind to detect any failure if the server side sends back a response with 3 parts, namely headers, request body, and trailers where trailers contain the gRPC status code (as documented in the gRPC protocol specification).
Correct me if I am wrong on this one: when a
CircuitBreakerRule.builder()
has 2 callbacks registered, one fromonResponseTrailers(...)
and the other fromonResponseHeaders(...)
, theonResponseHeaders
callback does not get invoked when HTTP headers are received. Instead, it is invoked when HTTP trailers are received or when Armeria claims that there will be no HTTP trailers. There is a flagrequiresResponseTrailers
on eachCircuitBreakerRule
to decide which registered callbacks should be invoked. This behavior makes adding both on-responseHeaders and on-responseTrailers callbacks not an option for me because the on-responseHeaders will be invoked too late so that the circuit breaker fails to short-circuit the next client stub invocation.Proposed improvement
Issues # 2
so that whenCircuitBreakerRule.builder()
has both on-responseHeaders and on-responseTrailers callbacks registered, theon-responseHeader callback
is invoked immediately when HTTP headers are received. IMHO it does not make sense semantically to wait for trailers and then invoke theon-responseHeader callback
.on-responseHeaders
(with received HTTP headers) andon-responseTrailers
(with nothing) immediately. Because Armeria supports gRPC as a first-class citizen, it is reasonable (to some extent at least) to make Armeria be able to interpret gRPC trailers-only.Let me know if I misunderstood anything and what you think. Thanks!