akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 646 forks source link

PubSub GRPC does not handle server disconnection/retryable error #1615

Open vladimir-lu opened 5 years ago

vladimir-lu commented 5 years ago

Versions used

Alpakka version: 1.0-RC1 Akka version: 2.5.21

Expected Behavior

Subscribing with a PubSub GRPC client to a topic subscription using a streaming pull request should yield a constant stream of messages.

Notably the example code in https://doc.akka.io/docs/alpakka/current/google-cloud-pub-sub-grpc.html should work:

val request = StreamingPullRequest()
  .withSubscription(s"projects/$projectId/subscriptions/$subscription")
  .withStreamAckDeadlineSeconds(10)

val subscriptionSource = GooglePubSub.subscribe(request, pollInterval = 100.millis)

val res = subscriptionSource.runWith(Sink.foreach(println))
Await.result(res.map(identity), Duration.Inf)

Actual Behavior

At some point after start-up, usually something like 30-60 minutes later, the client receives a server disconnect and prints: UNAVAILABLE: The service was unable to fulfill your request. Please try again. [code=8a75]

This causes the stream to stop and the program to error.

However, this status is retryable - and indeed the java library does not even log the exception: https://github.com/googleapis/google-cloud-java/pull/2492 (also see https://stackoverflow.com/questions/46914087/google-pubsub-unavailable-the-service-was-unable-to-fulfill-your-request)

Relevant logs

10:20:06.966 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.g.netty.NettyClientHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 - R:pubsub.googleapis.com/172.217.17.138:443] OUTBOUND DATA: streamId=3 padding=0 endStream=false length=5 bytes=0000000000
10:20:06.972 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.g.netty.NettyClientHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 - R:pubsub.googleapis.com/172.217.17.138:443] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[grpc-status: 14, grpc-message: The service was unable to fulfill your request. Please try again. [code=8a75], content-disposition: attachment] padding=0 endStream=true
10:20:06.974 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.g.netty.NettyClientHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 - R:pubsub.googleapis.com/172.217.17.138:443] INBOUND RST_STREAM: streamId=3 errorCode=0
10:20:06.975 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.g.netty.NettyClientHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 - R:pubsub.googleapis.com/172.217.17.138:443] INBOUND PING: ack=false bytes=0
10:20:06.975 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.g.netty.NettyClientHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 - R:pubsub.googleapis.com/172.217.17.138:443] OUTBOUND PING: ack=true bytes=0
10:20:06.976 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.g.netty.NettyClientHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 - R:pubsub.googleapis.com/172.217.17.138:443] OUTBOUND RST_STREAM: streamId=3 errorCode=8
Exception in thread "main" io.grpc.StatusRuntimeException: UNAVAILABLE: The service was unable to fulfill your request. Please try again. [code=8a75]
    at io.grpc.Status.asRuntimeException(Status.java:530)
    at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.onCallClosed(AkkaNettyGrpcClientGraphStage.scala:170)
    at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1(AkkaNettyGrpcClientGraphStage.scala:74)
    at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1$adapted(AkkaNettyGrpcClientGraphStage.scala:70)
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:450)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:470)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:730)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:745)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:655)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
10:20:06.983 [pubsub-investigation-akka.actor.default-dispatcher-4] ERROR akka.stream.Materializer - [receive pubsub] Upstream failed.
io.grpc.StatusRuntimeException: UNAVAILABLE: The service was unable to fulfill your request. Please try again. [code=8a75]
    at io.grpc.Status.asRuntimeException(Status.java:530)
    at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.onCallClosed(AkkaNettyGrpcClientGraphStage.scala:170)
    at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1(AkkaNettyGrpcClientGraphStage.scala:74)
    at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1$adapted(AkkaNettyGrpcClientGraphStage.scala:70)
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:450)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:470)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:730)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:745)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:655)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
10:24:06.982 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.g.netty.NettyClientHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 - R:pubsub.googleapis.com/172.217.17.138:443] INBOUND GO_AWAY: lastStreamId=3 errorCode=0 length=17 bytes=73657373696f6e5f74696d65645f6f7574
10:24:06.986 [grpc-default-worker-ELG-1-2] DEBUG i.g.n.s.i.n.handler.ssl.SslHandler - [id: 0x5b822d9b, L:/10.99.55.69:47700 ! R:pubsub.googleapis.com/172.217.17.138:443] SSLEngine.closeInbound() raised an exception.
javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify
    at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:129)
    at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:117)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:308)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:255)
    at java.base/sun.security.ssl.SSLEngineImpl.closeInbound(SSLEngineImpl.java:724)
    at io.grpc.netty.shaded.io.netty.handler.ssl.JdkSslEngine.closeInbound(JdkSslEngine.java:55)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.setHandshakeFailure(SslHandler.java:1558)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1045)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
    at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

Reproducible Test Case

Reproduction code included above - you need to run this against the live PubSub service for some time to reproduce.

2m commented 5 years ago

This could be solved ad-hoc on the call side with RestartSource.

But it may be nicer to solve it in the connector itself, where it would retry if the response indicates that the request is retriable.

ramazanyich commented 3 years ago

we also get the same issue with gRPC pubsub client using alpakka 2.0.2 io.grpc.StatusRuntimeException: INTERNAL: HTTP/2 error code: INTERNAL_ERROR Received Rst Stream at io.grpc.Status.asRuntimeException(Status.java:533) at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.onCallClosed(AkkaNettyGrpcClientGraphStage.scala:165) at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1(AkkaNettyGrpcClientGraphStage.scala:71) at akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1.$anonfun$callback$1$adapted(AkkaNettyGrpcClientGraphStage.scala:67) at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:466) at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:498) at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600) at

Would be nice if suggested PR will be taken into main branch. Currently have to workaround the problem with RestartSource