akka / akka-http

The Streaming-first HTTP server/module of Akka
https://doc.akka.io/libraries/akka-http/current/
Other
1.34k stars 594 forks source link

Http2StreamHandling - handleOutgoingEnded received unexpectedly in state Closed #2957

Closed steinybot closed 4 years ago

steinybot commented 4 years ago

I had just started an Akka gRPC service and was trying it out with grpcurl. I noticed a log message which said that it might be a bug and to report it so here I go.

The individual message is:

[info] [WARN] [02/11/2020 22:10:46.808] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] handleOutgoingEnded received unexpectedly in state Closed. This indicates a bug in Akka HTTP, please report it to the issue tracker.

The mostly full log is:

[info] [DEBUG] [02/11/2020 21:55:05.585] [main] [akka.actor.ActorSystemImpl(ConsoleAPIServer)] Binding server using HTTP/2
[info] [DEBUG] [02/11/2020 21:55:05.729] [ConsoleAPIServer-akka.actor.internal-dispatcher-6] [akka://ConsoleAPIServer/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:8080
[info] 21:55:05.735 INFO  - Console API Server is bound to: /127.0.0.1:8080 (ConsoleServer.scala:37)
[info] [DEBUG] [02/11/2020 21:57:17.874] [ConsoleAPIServer-akka.actor.internal-dispatcher-2] [akka://ConsoleAPIServer/system/IO-TCP/selectors/$a/0] New connection accepted
[info] [DEBUG] [02/11/2020 21:58:18.095] [ConsoleAPIServer-akka.actor.default-dispatcher-9] [akka://ConsoleAPIServer/system/Materializers/StreamSupervisor-0/flow-1-0-detacher] Aborting tcp connection to /127.0.0.1:55991 because of upstream failure: akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [/127.0.0.1:55991], no bytes passed in the last 1 minute
[info] [DEBUG] [02/11/2020 21:58:18.096] [ConsoleAPIServer-akka.actor.default-dispatcher-9] [TcpConnectionStage$TcpStreamLogic(akka://ConsoleAPIServer)] Aborting connection from 127.0.0.1:55991 because of downstream failure: java.util.concurrent.TimeoutException: No elements passed in the last 1 minute.
[info] [DEBUG] [02/11/2020 22:08:49.586] [ConsoleAPIServer-akka.actor.internal-dispatcher-2] [akka://ConsoleAPIServer/system/IO-TCP/selectors/$a/0] New connection accepted
[info] [WARN] [02/11/2020 22:08:49.593] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(ConsoleAPIServer)] Illegal request, responding with status '400 Bad Request': Unsupported HTTP method: The HTTP method started with 0x16 rather than any known HTTP method. Perhaps this was an HTTPS request sent to an HTTP endpoint?
[info] [DEBUG] [02/11/2020 22:08:49.595] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [StreamUtils$DelayCancellationStage$$anon$11(akka://ConsoleAPIServer)] Delaying cancellation for 1 minute
[info] [DEBUG] [02/11/2020 22:09:46.631] [ConsoleAPIServer-akka.actor.internal-dispatcher-12] [akka://ConsoleAPIServer/system/IO-TCP/selectors/$a/0] New connection accepted
[info] [DEBUG] [02/11/2020 22:09:46.660] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForNetworkToSendControlFrames
[info] [DEBUG] [02/11/2020 22:09:46.661] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForNetworkToSendControlFrames to Idle
[info] [DEBUG] [02/11/2020 22:09:46.665] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [DEBUG] [02/11/2020 22:09:46.670] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForData to Idle
[info] [DEBUG] [02/11/2020 22:09:46.670] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [DEBUG] [02/11/2020 22:09:46.671] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Got unhandled event SettingsAckFrame(List())
[info] [DEBUG] [02/11/2020 22:09:46.682] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForData to Idle
[info] [DEBUG] [02/11/2020 22:09:46.682] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Received DATA 52 for stream [1], remaining window space now 65483, buffered: 52
[info] [DEBUG] [02/11/2020 22:09:46.684] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [DEBUG] [02/11/2020 22:09:46.686] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForData to Idle
[info] [DEBUG] [02/11/2020 22:09:46.690] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [WARN] [02/11/2020 22:09:46.694] [ConsoleAPIServer-akka.actor.default-dispatcher-15] [Http2ServerDemux(akka://ConsoleAPIServer)] handleOutgoingEnded received unexpectedly in state HalfClosedLocal. This indicates a bug in Akka HTTP, please report it to the issue tracker.
[info] [DEBUG] [02/11/2020 22:09:49.617] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [StreamUtils$DelayCancellationStage$$anon$11(akka://ConsoleAPIServer)] Stage was canceled after delay of 1 minute
[info] [DEBUG] [02/11/2020 22:10:46.806] [ConsoleAPIServer-akka.actor.default-dispatcher-14] [akka://ConsoleAPIServer/system/Materializers/StreamSupervisor-0/flow-5-0-detacher] Aborting tcp connection to /127.0.0.1:56141 because of upstream failure: akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [/127.0.0.1:56141], no bytes passed in the last 1 minute
[info] [DEBUG] [02/11/2020 22:10:46.806] [ConsoleAPIServer-akka.actor.default-dispatcher-14] [TcpConnectionStage$TcpStreamLogic(akka://ConsoleAPIServer)] Aborting connection from 127.0.0.1:56141 because of downstream failure: java.util.concurrent.TimeoutException: No elements passed in the last 1 minute.
[info] [WARN] [02/11/2020 22:10:46.808] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] handleOutgoingEnded received unexpectedly in state Closed. This indicates a bug in Akka HTTP, please report it to the issue tracker.
[info] [DEBUG] [02/11/2020 22:10:46.808] [ConsoleAPIServer-akka.actor.internal-dispatcher-16] [akka://ConsoleAPIServer/system/IO-TCP/selectors/$a/0] New connection accepted
[info] [DEBUG] [02/11/2020 22:10:46.814] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForNetworkToSendControlFrames
[info] [DEBUG] [02/11/2020 22:10:46.814] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForNetworkToSendControlFrames to Idle
[info] [DEBUG] [02/11/2020 22:10:46.814] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [DEBUG] [02/11/2020 22:10:46.815] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForData to Idle
[info] [DEBUG] [02/11/2020 22:10:46.815] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [DEBUG] [02/11/2020 22:10:46.815] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [Http2ServerDemux(akka://ConsoleAPIServer)] Got unhandled event SettingsAckFrame(List())
[info] [DEBUG] [02/11/2020 22:11:46.985] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [akka://ConsoleAPIServer/system/Materializers/StreamSupervisor-0/flow-8-0-detacher] Aborting tcp connection to /127.0.0.1:56142 because of upstream failure: akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [/127.0.0.1:56142], no bytes passed in the last 1 minute
[info] [DEBUG] [02/11/2020 22:11:46.985] [ConsoleAPIServer-akka.actor.default-dispatcher-13] [TcpConnectionStage$TcpStreamLogic(akka://ConsoleAPIServer)] Aborting connection from 127.0.0.1:56142 because of downstream failure: java.util.concurrent.TimeoutException: No elements passed in the last 1 minute.
[info] [DEBUG] [02/11/2020 22:11:46.987] [ConsoleAPIServer-akka.actor.internal-dispatcher-17] [akka://ConsoleAPIServer/system/IO-TCP/selectors/$a/0] New connection accepted
[info] [DEBUG] [02/11/2020 22:11:46.994] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForNetworkToSendControlFrames
[info] [DEBUG] [02/11/2020 22:11:46.994] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForNetworkToSendControlFrames to Idle
[info] [DEBUG] [02/11/2020 22:11:46.995] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [DEBUG] [02/11/2020 22:11:46.995] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from WaitingForData to Idle
[info] [DEBUG] [02/11/2020 22:11:46.995] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] Changing state from Idle to WaitingForData
[info] [DEBUG] [02/11/2020 22:11:46.996] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [Http2ServerDemux(akka://ConsoleAPIServer)] Got unhandled event SettingsAckFrame(List())
[info] [DEBUG] [02/11/2020 22:12:47.193] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [akka://ConsoleAPIServer/system/Materializers/StreamSupervisor-0/flow-10-0-detacher] Aborting tcp connection to /127.0.0.1:56144 because of upstream failure: akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [/127.0.0.1:56144], no bytes passed in the last 1 minute
[info] [DEBUG] [02/11/2020 22:12:47.194] [ConsoleAPIServer-akka.actor.default-dispatcher-4] [TcpConnectionStage$TcpStreamLogic(akka://ConsoleAPIServer)] Aborting connection from 127.0.0.1:56144 because of downstream failure: java.util.concurrent.TimeoutException: No elements passed in the last 1 minute.

This then keeps repeating, presumably the client is reconnecting but not sending anything.

What I did was:

  1. Start the app
  2. Send an empty message with grpcurl:
    ❯ grpcurl localhost:8080 com.lightbend.console.api.MetricsProtoService/GetMetrics
    Failed to dial target host "localhost:8080": tls: first record does not look like a TLS handshake
  3. Did the same again but this time with the -plaintext flag:
    ❯ grpcurl -plaintext localhost:8080 com.lightbend.console.api.MetricsProtoService/GetMetrics
    ^C
raboof commented 4 years ago

Hi @steinybot, thanks for the report! I can reproduce the issue.

grpcurl depends on the Reflection API. Work on supporting gRPC reflection is underway (https://github.com/akka/akka-grpc/issues/380), but not released yet.

When I try to use grpcurl with a gRPC server that does not support Server Relfection, I can reproduce the issue you describe above. With a gRPC server that has the preliminary support for Server Reflection (currently found on master), it works for me.

jrudolph commented 4 years ago

I looked into the code to figure out what could be the issue last week but didn't see anything obvious. Good that we can reproduce it now!

laszlox commented 4 years ago

Hi,

I'm getting the same error in my helloworld-like akka-grpc server, when a dotnet-core 3.1 client calls it (it works fine from akka-grpc client or for example nodejs client). I'm not sure if it's the same issue as above, because it does not happen always, maybe 1 out 5 times.

akka: 2.6.3 akka-http: 10.1.11

I noticed that the dotnet-core client sends slightly different HTTP2 settings than akka-grpc or nodejs clients, for example: SETTINGS_HEADER_TABLE_SIZE(0). Not sure if it can cause any issue like this in akka-http.

the DEBUG logs:

2020-02-21 11:21:59,818 [11:21:59.817UTC] [DEBUG] TcpListener: New connection accepted 
2020-02-21 11:21:59,832 [11:21:59.832UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForNetworkToSendControlFrames 
2020-02-21 11:21:59,832 [11:21:59.832UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForNetworkToSendControlFrames to Idle 
2020-02-21 11:21:59,833 [11:21:59.833UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,834 [11:21:59.833UTC] [DEBUG] Http2ServerDemux: Got 2 settings! 
2020-02-21 11:21:59,834 [11:21:59.834UTC] [DEBUG] Http2ServerDemux: Ignoring setting SETTINGS_ENABLE_PUSH -> 0 (in Demux) 
2020-02-21 11:21:59,834 [11:21:59.834UTC] [DEBUG] Http2ServerDemux: Ignoring setting SETTINGS_HEADER_TABLE_SIZE -> 0 (in Demux) 
2020-02-21 11:21:59,834 [11:21:59.834UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,835 [11:21:59.835UTC] [DEBUG] HeaderCompression$$anon$1: Applied SETTINGS_HEADER_TABLE_SIZE(0) in header compression 
2020-02-21 11:21:59,835 [11:21:59.835UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,835 [11:21:59.835UTC] [DEBUG] Http2ServerDemux: Updating outgoing connection window by 67043329 to 67108864 
2020-02-21 11:21:59,841 [11:21:59.841UTC] [DEBUG] Http2ServerDemux: Got unhandled event SettingsAckFrame(List()) 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: Received DATA 5 for stream [1], remaining window space now 65530, buffered: 5 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: Received DATA 61 for stream [1], remaining window space now 65469, buffered: 66 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: Dispatched chunk of 66 for stream [1], remaining window space now 65469, buffered: 0 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: adjusting con-level window by 0, stream-level window by 446531, remaining window space now 512000, buffered: 0, remaining connection window space now 9999934, total buffered: 0 
2020-02-21 11:21:59,850 [11:21:59.850UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,851 [11:21:59.851UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,851 [11:21:59.851UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,851 [11:21:59.851UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,851 [11:21:59.851UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,852 [11:21:59.852UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,853 [11:21:59.853UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,853 [11:21:59.853UTC] [DEBUG] Http2ServerDemux: [1] buffered 80 bytes 
2020-02-21 11:21:59,854 [11:21:59.853UTC] [DEBUG] Http2ServerDemux: [1] sending 80 bytes, endStream = false, remaining buffer [0], remaining stream-level WINDOW [65455] 
2020-02-21 11:21:59,854 [11:21:59.853UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,854 [11:21:59.853UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,854 [11:21:59.854UTC] [DEBUG] Http2ServerDemux: [1] buffered 0 bytes 
2020-02-21 11:21:59,854 [11:21:59.854UTC] [WARN ] Http2ServerDemux: handleOutgoingEnded received unexpectedly in state Closed. This indicates a bug in Akka HTTP, please report it to the issue tracker. 
2020-02-21 11:21:59,854 [11:21:59.854UTC] [DEBUG] Http2ServerDemux: Changing state from WaitingForData to Idle 
2020-02-21 11:21:59,854 [11:21:59.854UTC] [DEBUG] Http2ServerDemux: Changing state from Idle to WaitingForData 
2020-02-21 11:21:59,854 [11:21:59.854UTC] [WARN ] Http2ServerDemux: handleOutgoingEnded received unexpectedly in state Closed. This indicates a bug in Akka HTTP, please report it to the issue tracker. 
raboof commented 4 years ago

hi @laszlox , thanks for joining in! That might be the same issue, but unfortunately that's not possible to tell from just those logs. The fact that you're seeing SETTINGS_HEADER_TABLE_SIZE suggests that you're looking at the communication with something like wireshark. Do you also see a RST_STREAM sent from the client to the server? What error code is associated with? If it's also PROTOCOL_ERROR, it would be interesting to see if we can find anything in the exchange that would justify that error.

Perhaps it would be worth opening a separate issue until we know it's the same thing. There it might be interesting if you could share a wireshark/tcpdump log, and perhaps a minimal client program that shows the issue. I don't really have any dotnet-core experience myself, though... ;)

laszlox commented 4 years ago

Thanks for the quick response!

I created https://github.com/akka/akka-http/issues/2977 with a small server and client project that reproduces the issue. I wasn't running wireshark or anything like that, but I noticed that the dotnet-core http2 connection sends the SETTINGS_HEADER_TABLE_SIZE=0 value, at least in version 3.1. I don't know whether it sent RST_STREAM...

raboof commented 4 years ago

I dug a bit deeper in the grpcurl PROTOCOL_ERROR, and it turns out the http2 client they use for grpc sends a stream reset with type 'protocol error' for 'invalid' headers, and any non-gRPC response is considered 'invalid'.

While it seems questionable to me to report this as a 'protocol error' on the HTTP2 level (as it's an error on the gRPC level), the http2 spec is not super clear on this, so it might be fine.

It might be a bit clearer if we responded with a body with content-type application/grpc - that would tell the client this is a gRPC response. We'd have to figure out what to put in that response body then though (not sure it is allowed to be empty?).

With this background I think we can close this issue when #2976 has been merged

jrudolph commented 4 years ago

Does it have any consequences that grpcurl fails this way? I guess, since it is a one-shot client, it wouldn't matter. We should just make sure we fail more gracefully (as done in #2976).

raboof commented 4 years ago

Does it have any consequences that grpcurl fails this way?

Not that I've seen

ennru commented 4 years ago

Fixed with #2976

gkirill commented 4 years ago

I am getting this error

[2020-11-16 14:29:10,479] [WARN] [akka.http.impl.engine.http2.Http2ServerDemux] - handleOutgoingEnded received unexpectedly in state HalfClosedLocal. This indicates a bug in Akka HTTP, please report it to the issue tracker.
[2020-11-16 14:29:10,483] [WARN] [akka.http.impl.engine.http2.Http2ServerDemux] - handleOutgoingEnded received unexpectedly in state Closed. This indicates a bug in Akka HTTP, please report it to the issue tracker.

My setup is

val akkaVersion = "2.6.9"
val akkaHttpVersion = "10.2.1"

I tried versions of Akka 2.6.8, 2.6.7, 2.6.6 - same error, however 2.6.5 is fine. Literally just using 2.6.5 removes the error. I need to use Akka 2.6.9 because I need this for Akka Persistence Cassandra 1.0.4.

Any ideas on this issue please? Stuck with the project. Wanted to go back to Akka 2.6.5, but Persistence Cassandra needs 2.6.9

On the client side I am using

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.30.2</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.30.2</version>
</dependency>
jrudolph commented 4 years ago

Thanks for the report, @gkirill. We've did lots of improvements since 10.2.1 in this area. Can you check if using the latest snapshot would fix this?

The latest snapshot version of akka-http is 10.2.1+110-1fbbfffa and you need to add the snapshot resolver:

resolvers += "Akka snapshots" at "https://dl.bintray.com/akka/snapshots"
gkirill commented 4 years ago

@jroper thanks for the quickest response, I gave it a try, and the error is not output anymore, however on the client side I still get the same error (I was getting same error with 10.2.1 on the client). So it seems that it just does not output the error anymore, however the actual error still seems to be there.

2020-11-16T15:33:40.076280200Z io.grpc.StatusRuntimeException: UNIMPLEMENTED
2020-11-16T15:33:40.076287600Z  at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076312400Z  at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:460) ~[grpc-stub-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076321300Z  at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076338900Z  at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076344200Z  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076349800Z  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076367900Z  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076374400Z  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076380300Z  at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076385300Z  at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[grpc-core-1.30.2.jar!/:1.30.2]
2020-11-16T15:33:40.076390600Z  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
2020-11-16T15:33:40.076395700Z  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
2020-11-16T15:33:40.076401000Z  at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
jrudolph commented 4 years ago

Interesting, @gkirill. If you can reproduce the issue reliably, can you enable debug logging on the server and also akka.http.server.http2.log-frames = on and send us the logs (either here or by email)? More logs from the client might also help. Do you use akka-grpc or use grpc-java directly?

gkirill commented 4 years ago

@jrudolph Thanks for quick response.

Are these the logs? Or should I enable TRACE? Btw, I am trying to do 2-way streaming if it matters. On the client side I am not using Akka at all, it is pure Spring Boot with grpc.

[2020-11-19 13:13:06,105] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-11-19 13:13:06,116] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Updating window for 3 by 0 to 1048576 buffered bytes: 0
[2020-11-19 13:13:06,116] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: OpenReceivingDataFirst -> Open after running class akka.http.impl.engine.http2.Http2StreamHandling$$Lambda$2375/0x0000000100e4a040
[2020-11-19 13:13:06,124] [DEBUG] [akka.stream.Materializer] - [DOWN] Element:    3 HEAD  :status -> 200, content-type -> application/grpc+proto, grpc-encoding -> gzip, date -> Thu, 19 Nov 2020 13:13:06 GMT, server -> akka-http/10.2.1+110-1fbbfffa
[2020-11-19 13:13:06,130] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-11-19 13:13:06,130] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-11-19 13:13:06,130] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Open -> HalfClosedLocal after running class akka.http.impl.engine.http2.Http2StreamHandling$$Lambda$2375/0x0000000100e4a040
[2020-11-19 13:13:06,138] [DEBUG] [akka.stream.Materializer] - [DOWN] Element:    3 HEAD ES grpc-status -> 12, date -> Thu, 19 Nov 2020 13:13:06 GMT
[2020-11-19 13:13:06,140] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-11-19 13:13:06,176] [DEBUG] [akka.stream.Materializer] - [ UP ] Element:    3 RSET  CANCEL
[2020-11-19 13:13:06,177] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: HalfClosedLocal -> Closed after running class akka.http.impl.engine.http2.Http2StreamHandling$$Lambda$2375/0x0000000100e4a040
[2020-11-19 13:13:06,184] [DEBUG] [akka.stream.Materializer] - [ UP ] Element:    0 GOAY  lastStreamId = 0, errorCode = NO_ERROR, debug = 
[2020-11-19 13:13:06,184] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Got unhandled event GoAwayFrame(0,NO_ERROR,debug:<hidden>)
[2020-11-19 13:13:06,189] [DEBUG] [akka.stream.Materializer] - [ UP ] Upstream finished.
[2020-11-19 13:13:06,192] [DEBUG] [akka.stream.Materializer] - [DOWN] Upstream finished.
gkirill commented 4 years ago

@jrudolph Here are some logs from the client side, not sure if there is anything of interest though.

2020-11-19 13:21:29.714 DEBUG 6 --- [ault-executor-0] i.grpc.netty.shaded.io.grpc.netty.Utils  : Using custom allocator: forceHeapBuffer=false, defaultPreferDirect=true
2020-11-19 13:21:29.715 DEBUG 6 --- [ault-executor-0] i.grpc.netty.shaded.io.grpc.netty.Utils  : Creating allocator, preferDirect=true
2020-11-19 13:21:29.715 DEBUG 6 --- [ault-executor-0] i.grpc.netty.shaded.io.grpc.netty.Utils  : Forcing maxOrder=8
2020-11-19 13:21:29.728  INFO 6 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-11-19 13:21:29.730 DEBUG 6 --- [ault-executor-0] i.g.n.s.i.n.channel.DefaultChannelId     : -Dio.netty.processId: 6 (auto-detected)
2020-11-19 13:21:29.748 DEBUG 6 --- [ault-executor-0] i.g.n.s.i.n.channel.DefaultChannelId     : -Dio.netty.machineId: 02:42:c0:ff:fe:a8:00:06 (auto-detected)
2020-11-19 13:21:29.791 DEBUG 6 --- [ault-executor-0] i.g.n.s.io.netty.buffer.ByteBufUtil      : -Dio.netty.allocator.type: pooled
2020-11-19 13:21:29.791 DEBUG 6 --- [ault-executor-0] i.g.n.s.io.netty.buffer.ByteBufUtil      : -Dio.netty.threadLocalDirectBufferSize: 0
2020-11-19 13:21:29.792 DEBUG 6 --- [ault-executor-0] i.g.n.s.io.netty.buffer.ByteBufUtil      : -Dio.netty.maxThreadLocalCharBufferSize: 16384
2020-11-19 13:21:29.805 DEBUG 6 --- [ault-executor-0] i.g.netty.shaded.io.netty.util.Recycler  : -Dio.netty.recycler.maxCapacityPerThread: 4096
2020-11-19 13:21:29.812  INFO 6 --- [           main] c.l.l.MyConnectorApplication           : Started MyConnectorApplication in 2.872 seconds (JVM running for 4.741)
2020-11-19 13:21:29.806 DEBUG 6 --- [ault-executor-0] i.g.netty.shaded.io.netty.util.Recycler  : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2020-11-19 13:21:29.817 DEBUG 6 --- [ault-executor-0] i.g.netty.shaded.io.netty.util.Recycler  : -Dio.netty.recycler.linkCapacity: 16
2020-11-19 13:21:29.817 DEBUG 6 --- [ault-executor-0] i.g.netty.shaded.io.netty.util.Recycler  : -Dio.netty.recycler.ratio: 8
2020-11-19 13:21:29.880 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] OUTBOUND SETTINGS: ack=false settings={ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}
2020-11-19 13:21:29.907 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=983041
2020-11-19 13:21:30.048 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] INBOUND SETTINGS: ack=false settings={MAX_CONCURRENT_STREAMS=256}
2020-11-19 13:21:30.049 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] OUTBOUND SETTINGS: ack=true
2020-11-19 13:21:30.076 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] INBOUND SETTINGS: ack=true
2020-11-19 13:21:30.083 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] OUTBOUND HEADERS: streamId=3 headers=GrpcHttp2OutboundHeaders[:authority: grpcservice-1:9080, :path: /mypackage.MyServiceGrpcService/Connect, :method: POST, :scheme: http, content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.30.2, grpc-accept-encoding: gzip, authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0ZW5hbnRJZCI6ImU4MjM1Yjc0LTAxNzYtNDRkOC05OWFiLTM4MTk5ODdhOTIxNyIsImRvbWFpbklkIjoiNGRmZDY4ODEtMjI3OS00YmVmLTk1NzItMjM5NzI2MjI2YmFhIn0.I62dh_DgPFF-z0mO_pCznkpMUa5pTOZzjNkNwtubsps, x-connector-name: Connector-1] streamDependency=0 weight=16 exclusive=false padding=0 endStream=false
2020-11-19 13:21:30.415 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc+proto, grpc-encoding: gzip, date: Thu, 19 Nov 2020 13:21:30 GMT, server: akka-http/10.2.1+110-1fbbfffa] padding=0 endStream=false
2020-11-19 13:21:30.427 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[grpc-status: 12, date: Thu, 19 Nov 2020 13:21:30 GMT] padding=0 endStream=true
2020-11-19 13:21:30.429 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] OUTBOUND RST_STREAM: streamId=3 errorCode=8
2020-11-19 13:21:30.430 ERROR 6 --- [ault-executor-1] c.l.MyConnector.grpc.GrpcService       : onError

io.grpc.StatusRuntimeException: UNIMPLEMENTED
    at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.30.2.jar!/:1.30.2]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:460) ~[grpc-stub-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[grpc-core-1.30.2.jar!/:1.30.2]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2020-11-19 13:21:30.433 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : Network channel being closed by the application.
2020-11-19 13:21:30.434 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xdce54446, L:/192.168.0.6:49152 - R:grpcservice-1/192.168.0.2:9080] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes=
2020-11-19 13:21:30.436 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : Network channel is closed
2020-11-19 13:21:30.449 DEBUG 6 --- [ault-executor-1] c.l.MyConnector.grpc.GrpcService       : GrpcService#connectWithDelay
2020-11-19 13:21:31.452 DEBUG 6 --- [-worker-ELG-1-2] i.g.n.s.io.netty.buffer.PoolThreadCache  : Freed 6 thread-local buffer(s) from thread: grpc-default-worker-ELG-1-2
2020-11-19 13:21:34.802 DEBUG 6 --- [   scheduling-1] c.l.MyConnector.grpc.GrpcService       : GrpcService#sendHeartbeat
2020-11-19 13:21:34.812 DEBUG 6 --- [   scheduling-1] c.l.MyConnector.grpc.GrpcService       : GrpcService#sendMessageFromConnector messageType: HEARTBEAT
jrudolph commented 4 years ago

Thanks, @gkirill, that looks indeed quite similar to what was originally reported in this ticket. It seems the client does not like the response and closes the stream and connection forcefully.

The grpc-status -> 12 code the server is reporting is an UNIMPLEMENTED error code.

What version of akka-grpc do you use for the server?

@raboof any idea in which scenarios akka-grpc returns error code 12?

jrudolph commented 4 years ago

The most likely case for UNIMPLEMENTED is just that you are calling a method that the server doesn't know about.

raboof commented 4 years ago

I don't see an obvious reason for this error other than the service not implementing /mypackage.MyServiceGrpcService/Connect, indeed... what is the server side implemented with? have you tried calling it with grpccurl? any chance you could reproduce this problem with 2 'minimal' applications?

jrudolph commented 4 years ago

Proposed https://github.com/akka/akka-grpc/pull/1208 to add a grpc-message header to responses to clarify for clients what the problem is, I guess such an error is easy enough to provoke, e.g. because of a typo somewhere or just pointing to the wrong server.

gkirill commented 4 years ago

@jrudolph @raboof Thanks, guys for your replies, really appreciated. Your proposed reason is very unlikely because as I mentioned above, everything worked just fine with Akka 2.6.5. There were no any issues and client and server communicated with each over absolutely fine. I only started getting this error when I switched to Akka 2.6.6 (2.6.7, 2.6.8, 2.6.9) on the server side, without changing server side implementation at all.

I use same proto file for both server and client (only java_package is a bit different)

service MyServiceGrpcService {
    rpc Connect (stream MessageFromConnector) returns (stream MessageToConnector) {}
}

On the server side I use "com.lightbend.akka.grpc" % "sbt-akka-grpc" % "1.0.2" plugin for Grpc generation.

Here is an abstract of my server implementation, note that I am using metadata for authentication purposes. I do have connect method implemented (I tried both Connect and connect in proto file, it does not matter).

class MyServiceGrpcServiceImpl(context: ActorContext[Nothing]) extends MyServiceGrpcServicePowerApi {

  implicit val materializer: ActorSystem[Nothing] = context.system
  import context.executionContext

  private val authorizationHeaderKey = "Authorization"
  private val connectorNameHeaderKey = "X-Connector-Name"

  private val responseQueueBufferSize = ConfigFactory.load().getInt("grpc.outgoing.stream.buffer")

  override def connect(in: Source[MessageFromConnector, NotUsed], metadata: Metadata): Source[MessageToConnector, NotUsed] = {

    val authHeader = metadata.getText(authorizationHeaderKey)
    if (authHeader.isEmpty) {
      throw new GrpcServiceException(Status.UNAUTHENTICATED)
    }

    val token = ClientsAuth.Connector.verifyAuthorizationHeader(authHeader.get)
    if (token.isFailure) {
      throw new GrpcServiceException(Status.UNAUTHENTICATED)
    }
    context.log.info(s"Authenticated connector $token")

    // For some reason connectorNameHeaderKey works only if is in lower case
    val connectorNameOption = metadata.getText(connectorNameHeaderKey.toLowerCase)
    if (connectorNameOption.isEmpty) {
      throw new GrpcServiceException(Status.INVALID_ARGUMENT)
    }
....

This is how I start the server

object AppSupervisor {

  def apply(): Behavior[Nothing] =
    Behaviors.setup[Nothing](context => {

      val sharding = ClusterSharding(context.system)

      sharding.init(Entity(TenantActor.EntityKey) { entityContext =>
        TenantActor(PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
      })

      val routes = new Web.Routes(sharding)(context.system)
      val port = ConfigFactory.load().getInt("web.http.port")
      new Web.Server(routes.routes, port, context.system).start()

      implicit val classicSystem: ActorSystem[Nothing] = context.system

      Http()
        .newServerAt("0.0.0.0", ConfigFactory.load().getInt("grpc.port"))
        .bind(MyServiceServicePowerApiHandler(new MyServiceGrpcServiceImpl(context)))

      Behaviors.empty

    })

}

Client is implemented with Java/Spring Boot

pom.xml

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.30.2</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>1.30.2</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>1.30.2</version>
</dependency>

Here is how I establish connection on the client, please note that I use metadata for authentication.

private void connect() {
    log.debug("GrpcService#connect");

    final ManagedChannel channel = ManagedChannelBuilder
        .forAddress(GRPC_HOST, GRPC_PORT)
        .usePlaintext() // FIXME
        .build();

    final MyServiceGrpcServiceGrpc.MyServiceGrpcServiceStub stub = MyServiceGrpcServiceGrpc
        .newStub(channel)
        .withCallCredentials(credentialsProvider);

    messageFromConnectorStreamObserver =
        stub.connect(new StreamObserver<MessageToConnector>() {

            @Override
            public void onNext(final MessageToConnector messageToConnector) {
                log.debug("onNext {}", messageToConnector);
                // Starting processing of incoming message in a separate thread to avoid blocking
                // processing of subsequent messages
                executor.execute(() -> processMessageToConnector(messageToConnector));
            }

            @SneakyThrows
            @Override
            public void onError(Throwable throwable) {
                messageFromConnectorStreamObserver = null;
                log.error("onError", throwable);
                channel.shutdown();
                channel.awaitTermination(GRPC_CHANNEL_TERMINATION_TIMEOUT, TimeUnit.MILLISECONDS);
                connectWithDelay();
            }

            @Override
            public void onCompleted() {
                log.warn("onCompleted - the stream is never supposed to be completed");
            }

        });

}

If there are no ideas, I guess I will need to start a clean project trying to reproduce the issue.

jrudolph commented 4 years ago

Thanks, guys for your replies, really appreciated. Your proposed reason is very unlikely because as I mentioned above, everything worked just fine with Akka 2.6.5. There were no any issues and client and server communicated with each over absolutely fine. I only started getting this error when I switched to Akka 2.6.6 (2.6.7, 2.6.8, 2.6.9) on the server side, without changing server side implementation at all.

I looked into changes between 2.6.5 and 2.6.6 but it seems quite unlikely that those changes could have such an effect as well. :)

If there are no ideas, I guess I will need to start a clean project trying to reproduce the issue.

That would be great.

Other than that, what you could do is making a copy of the generated MyServiceServicePowerApiHandler.scala and use the copy instead to see if the apply/partial method is where the NotImplementedError error is generated (or which request is received there).

gkirill commented 4 years ago

@jrudolph I created a very simple version of my app here https://github.com/gkirill/akka-grpc-example and I should say at least with this setup I was not able to reproduce the issue. Communication between the server and the client works just fine, even using docker-compose. I had an idea that maybe it was caused by not using TLS for non-localhost domains (because http2 is not supposed to work without TLS), but this assumption was not correct.

I will continue to add other feature of my app to this example until I either reproduce it or I will build my app! Thanks for your answers, guys.

gkirill commented 4 years ago

Just gonna post here my build.sbt file contents, with this setup I was able to get the following to work

  1. Akka Cluster
  2. Akka Persistence with Cassandra
  3. Writing and reading domain data (not related to Akka Persistence) to/from Cassandra
  4. HTTP server
  5. GRPC server with support for metadata and 2-way streams
  6. Works in Docker-compose setup (at least locally so far)

I do not know what was causing the original issue though.


lazy val akkaHttpVersion = "10.2.1"
lazy val akkaVersion    = "2.6.10"

enablePlugins(AkkaGrpcPlugin)
akkaGrpcCodeGeneratorSettings += "server_power_apis"

lazy val root = (project in file(".")).
  settings(
    inThisBuild(List(
      organization    := "com.example",
      scalaVersion    := "2.13.3"
    )),
    name := "example-service",
    libraryDependencies ++= Seq(

      "com.typesafe.akka" %% "akka-http"                   % akkaHttpVersion,
      "com.typesafe.akka" %% "akka-http-spray-json"        % akkaHttpVersion,
      "com.typesafe.akka" %% "akka-http2-support"          % akkaHttpVersion,
      "com.typesafe.akka" %% "akka-serialization-jackson"  % akkaVersion,
      "com.typesafe.akka" %% "akka-actor-typed"            % akkaVersion,
      "com.typesafe.akka" %% "akka-stream"                 % akkaVersion,
      "com.typesafe.akka" %% "akka-discovery"              % akkaVersion,
      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
      "com.typesafe.akka" %% "akka-persistence-cassandra"  % "1.0.4",
      "com.typesafe.akka" %% "akka-persistence-typed"      % akkaVersion,
      "com.typesafe.akka" %% "akka-persistence-query"      % akkaVersion,
      "com.typesafe.akka" %% "akka-cluster-tools"          % akkaVersion,

      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "2.0.2",

      "ch.qos.logback"    % "logback-classic"           % "1.2.3",

    )
  )

assemblyMergeStrategy in assembly := {
  // https://stackoverflow.com/a/30713280/980452
  case PathList("reference.conf") => MergeStrategy.concat
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}
gkirill commented 3 years ago

@jrudolph @raboof guys, I have had some new development regarding the issue (connection gets closed with HalfClosedLocal -> Closed) and it turns out I still have it. I was able to boil down to a single string in my code which makes the difference between having the issue and not.

First example of code and logs is when the issue does not appear and everything works as expected:

Code and logs which works as expected

override def connect(in: Source[MessageFromConnector, NotUsed], metadata: Metadata): Source[MessageToConnector, NotUsed] = {

    val log = LoggerFactory.getLogger(s"GrpcConnection")
    log.debug(s"Established")

    in.runForeach(messageFromConnector => {
        log.debug(s"Received ${messageFromConnector.messageType}")
    })

    Source.never

}   

Connection gets established once and I start receiving heartbeats.

[2020-12-12 14:54:58,200] [DEBUG] [akka.io.TcpListener] - New connection accepted
[2020-12-12 14:54:58,236] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForNetworkToSendControlFrames
[2020-12-12 14:54:58,244] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForNetworkToSendControlFrames to Idle
[2020-12-12 14:54:58,246] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:54:58,250] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Got 4 settings!
[2020-12-12 14:54:58,260] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Ignoring setting SETTINGS_ENABLE_PUSH -> 0 (in Demux)
[2020-12-12 14:54:58,261] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Setting max concurrent streams to 0 (not enforced)
[2020-12-12 14:54:58,261] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Setting initial window to 1048576
[2020-12-12 14:54:58,272] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Ignoring setting SETTINGS_MAX_HEADER_LIST_SIZE -> 8192 (in Demux)
[2020-12-12 14:54:58,272] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:54:58,274] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:54:58,277] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Updating outgoing connection window by 983041 to 1048576
[2020-12-12 14:54:58,304] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Idle -> ReceivingDataFirst
[2020-12-12 14:54:58,340] [DEBUG] [GrpcConnection] - Established
[2020-12-12 14:54:58,365] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: ReceivingDataFirst -> Open
[2020-12-12 14:54:58,382] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:54:58,385] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:55:02,853] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:55:02,853] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Received DATA 7 for stream [3], remaining window space now 65528, buffered: 7
[2020-12-12 14:55:02,853] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Dispatched chunk of 7 for stream [3], remaining window space now 65528, buffered: 0
[2020-12-12 14:55:02,853] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForNetworkToSendControlFrames
[2020-12-12 14:55:02,854] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - adjusting con-level window by 0, stream-level window by 446472, remaining window space now 512000, buffered: 0, remaining connection window space now 9999993, total buffered: 0
[2020-12-12 14:55:02,855] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Open -> Open
[2020-12-12 14:55:02,856] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForNetworkToSendControlFrames to Idle
[2020-12-12 14:55:02,857] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:55:02,858] [DEBUG] [GrpcConnection] - Received HEARTBEAT
[2020-12-12 14:55:12,784] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Received DATA 7 for stream [3], remaining window space now 511993, buffered: 7
[2020-12-12 14:55:12,786] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Dispatched chunk of 7 for stream [3], remaining window space now 511993, buffered: 0
[2020-12-12 14:55:12,786] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - adjusting con-level window by 0, stream-level window by 0, remaining window space now 511993, buffered: 0, remaining connection window space now 9999986, total buffered: 0
[2020-12-12 14:55:12,786] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Open -> Open
[2020-12-12 14:55:12,785] [DEBUG] [GrpcConnection] - Received HEARTBEAT

Code and logs which cause the issue

The difference is that I am trying to spawn an actor inside connect method. That is it. The context is passed as a constructor argument to the implementation of the Grpc service.

override def connect(in: Source[MessageFromConnector, NotUsed], metadata: Metadata): Source[MessageToConnector, NotUsed] = {

    val log = LoggerFactory.getLogger(s"GrpcConnection")
    log.debug(s"Established")

    in.runForeach(messageFromConnector => {
        log.debug(s"Received ${messageFromConnector.messageType}")
    })

    val connectionBehavior = Behaviors.receiveMessage[ConnectionApi.Command] {
        case _ =>
            log.info("Sending message to connector")
            Behaviors.same
    }

    val connectionActor = context.spawnAnonymous(connectionBehavior)

    Source.never

}  

After each connection attempt from the client, the connection gets closed, then client attempts to connect again and this repeats.

[2020-12-12 14:58:10,554] [DEBUG] [akka.io.TcpListener] - New connection accepted
[2020-12-12 14:58:10,583] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForNetworkToSendControlFrames
[2020-12-12 14:58:10,598] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Got 4 settings!
[2020-12-12 14:58:10,598] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Ignoring setting SETTINGS_ENABLE_PUSH -> 0 (in Demux)
[2020-12-12 14:58:10,599] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Setting max concurrent streams to 0 (not enforced)
[2020-12-12 14:58:10,599] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Setting initial window to 1048576
[2020-12-12 14:58:10,599] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Ignoring setting SETTINGS_MAX_HEADER_LIST_SIZE -> 8192 (in Demux)
[2020-12-12 14:58:10,600] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Updating outgoing connection window by 983041 to 1048576
[2020-12-12 14:58:10,605] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForNetworkToSendControlFrames to Idle
[2020-12-12 14:58:10,609] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:58:10,651] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Idle -> ReceivingDataFirst
[2020-12-12 14:58:10,694] [DEBUG] [GrpcConnection] - Established
[2020-12-12 14:58:10,715] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: ReceivingDataFirst -> Open
[2020-12-12 14:58:10,715] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:58:10,717] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:58:10,723] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Open -> HalfClosedLocal
[2020-12-12 14:58:10,728] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:58:10,731] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:58:10,741] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: HalfClosedLocal -> Closed
[2020-12-12 14:58:10,747] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Got unhandled event GoAwayFrame(0,NO_ERROR,debug:<hidden>)
[2020-12-12 14:58:15,770] [DEBUG] [akka.io.TcpListener] - New connection accepted
[2020-12-12 14:58:15,776] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForNetworkToSendControlFrames
[2020-12-12 14:58:15,777] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForNetworkToSendControlFrames to Idle
[2020-12-12 14:58:15,777] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:58:15,778] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Got 4 settings!
[2020-12-12 14:58:15,778] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Ignoring setting SETTINGS_ENABLE_PUSH -> 0 (in Demux)
[2020-12-12 14:58:15,779] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Setting max concurrent streams to 0 (not enforced)
[2020-12-12 14:58:15,779] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Setting initial window to 1048576
[2020-12-12 14:58:15,779] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Ignoring setting SETTINGS_MAX_HEADER_LIST_SIZE -> 8192 (in Demux)
[2020-12-12 14:58:15,780] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:58:15,780] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:58:15,780] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Updating outgoing connection window by 983041 to 1048576
[2020-12-12 14:58:15,790] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Idle -> ReceivingDataFirst
[2020-12-12 14:58:15,791] [DEBUG] [GrpcConnection] - Established
[2020-12-12 14:58:15,794] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: ReceivingDataFirst -> Open
[2020-12-12 14:58:15,794] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:58:15,794] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:58:15,795] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: Open -> HalfClosedLocal
[2020-12-12 14:58:15,795] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from WaitingForData to Idle
[2020-12-12 14:58:15,796] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Changing state from Idle to WaitingForData
[2020-12-12 14:58:15,800] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Incoming side of stream [3] changed state: HalfClosedLocal -> Closed
[2020-12-12 14:58:15,800] [DEBUG] [akka.http.impl.engine.http2.Http2ServerDemux] - Got unhandled event GoAwayFrame(0,NO_ERROR,debug:<hidden>)

My goal is to represent this Grpc connection as an actor which would receive messages from the stream and which would also be able to send messages back to connector, but it seems I cannot spawn an actor inside connect method implementation.