Open sam0jones0 opened 1 month ago
Hey @sam0jones0, thanks for raising this. Quick question just to clarify where the error might lie here. I guess this error is not happening when you use either Ember + HTTP PubSub or gRPC PubSub + Any other client, right?
Sorry, just saw the Try using http only (no gRPC).
while re-reading the text.
Thanks for getting back so quickly, yeah its odd. We've tried EmberClient and http4s-netty, same thing.
Here are a few more logs leading up to the issue occuring
2024-10-16 13:21:45.902 - HTTP/2.0 200 OK - Headers(content-disposition: attachment, content-type: application/grpo, date: Wed, 16 Oct 2024 12:21:45 GMT) body="*
2024-10-10 13:21:40.900 - tempLoggerMiddleWare Log Request Body: [Request (method=POST, urishttps://pubsub.googleapis.com/google.pubsub.v1.Subscriber/Pull, httpVersion=HTTP/2.0, headers=Headers(te: trailers, grpo-encodz
2024-10-16 13:21:45.905 - pubsub.googleapis.com:443 Write - Headersidentifier=143, dependency=None, endStream=false, endHeaders=true, headerBlock=ByteVector(833 bytes, #-697511976), padding=None)
2024-10-16 13:21:45.907 - HTTP/2.0 POST https://pubsub.googleapis.com/google.pubsub.v1.Subscriber/Pull Headers(te: trailers, grpc-encoding: identity, grpc-accept-encoding: identity, Content-Type: application/grpc, Auth
2024-10-16 13:21:45.908 - pubsub.googleapis.com:443 Write - Data(identifier=143, data=ByteVector(107 bytes, [REDACTED]
2024-10-16 13:32:48.839 - Created Connection - RequestKey: http://metadata.google.internal
2024-10-16 13:32:48.840 - Connection Taken - Key: http://metadata.google.internal - Reused: false - PoolState: (0,Map())
2024-10-16 13:32:48.841 - Created Connection - RequestKey: http://metadata.google.internal
2024-10-16 13:32:48.841 - Connection Taken - Key: http://metadata.google.internal - Reused: false - PoolState: (0,Map())
2024-10-16 13:32:48.842 - HTTP/1.1 GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token Headers(Metadata-Flavor: Google, Accept: application/json) body=*"
2024-10-16 13:32:48.844 - HTTP/1.1 GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token Headers(Metadata-Flavor: Google, Accept: application/json) body=*"
2024-10-16 13:32:48.940 - HTTP/1.1 200 OK Headers(Content-Type: application/json, Metadata-Flavor: Google, Server: GKE Metadata Server, Date: Wed, 16 Oct 2024 12:32:48 GMT, Content-Length: 1083) body="{"access_token":"
2024-10-16 13:32:48.941 - HTTP/1.1 200 OK Headers(Content-Type: application/json, Metadata-Flavor: Google, Server: GKE Metadata Server, Date: Wed, 16 Oct 2024 12:32:48 GMT, Content-Length: 1083) body="{"access_token":"
2024-10-16 13:32:48.941 - Refreshed GCP Pubsub Token with duration 3595 seconds]
2024-10-16 13:32:48.941 - Refreshed GCP Pubsub Token with duration 3595 seconds]
2024-10-16 13:33:19.005 - Shutting Down Connection - RequestKey: http://metadata.google.internal
2024-10-16 13:33:23.061 - Shutting Down Connection - RequestKey: http://metadata.google.internal
2024-10-16 14:32:43.974 - Created Connection - RequestKey: http://metadata.google.internal
2024-10-16 14:32:43.974 - Connection Taken - Key: http://metadata.google.internal - Reused: false - PoolState: (0, Map())
2024-10-16 14:32:43.975 - Created Connection - RequestKey: http://metadata.google.internal
2024-10-16 14:32:43.975 - Connection Taken - Key: http://metadata.google.internal - Reused: false - PoolState: (0, Map())
2024-10-16 14:32:43.976 - HTTP/1.1 GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token Headers(Metadata-Flavor: Google, Accept: application/json) body=*"
2024-10-16 14:32:43.976 - HTTP/1.1 GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token Headers(Metadata-Flavor: Google, Accept: application/json) body=*"
2024-10-16 14:32:44.080 - HTTP/1.1 200 OK Headers(Content-Type: application/json, Metadata-Flavor: Google, Server: GKE Metadata Server, Date: Wed, 16 Oct 2024 13:32:44 GMT, Content-Length: 1083) body="{"access_token":"
2024-10-16 14:32:44.081 - Refreshed GCP Pubsub Token with duration 3595 seconds]
2024-10-16 14:32:44.081 - Refreshed GCP Pubsub Token with duration 3595 seconds]
2074-19-16 14:33:14.158 - Shutting Down Connection - RequestKey: http://metadata.google.internal
2824-10-16 14:33:18.216 - Shutting Down Connection - RequestKey: http://metadata.google.internal
I'll let you know if I figure anything out.
My guess is HTTP2 GoAway frames are cancelling the stream somehow. As I mentioned above, I added logging to the top level Stream
stream.handleErrorWith { error =>
Stream.eval(logger.error(error)(s"Stream error with reason: [$error]"))
And saw
"java.util.concurrent.CancellationException: Received GoAway, cancelling: GoAway(identifier=0, lastStreamId=581, errorCode=NoError, additionalDebugData=Some(ByteVector(7 bytes, 0x6d61785f616765)))
at org.http4s.ember.core.h2.H2Stream$State.cancelWith(H2Stream.scala:444)
at org.http4s.ember.core.h2.H2Stream.receiveGoAway$$anonfun$2(H2Stream.scala:371)
at rethrow$extension @ fs2.Compiler$Target.compile$$anonfun$1(Compiler.scala:157)
at get @ fs2.internal.Scope.openScope(Scope.scala:275)
at flatMap @ org.http4s.ember.core.h2.H2Stream.getResponse(H2Stream.scala:413)
at onError$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:371)
at main$ @ [...].Main$.main(Main.scala:22)
at main$ @ [...].Main$.main(Main.scala:22)
"
I would have thought EmberClient manages connection lifecycles for you?
Yeah, me too. Odd thing is we also have several subscribers using Ember in high demand topics and haven't observed this issue... What's the size of the subscription it is connecting to? It also seems you are making several refreshings of the GCP token to close to one another, is that expected?
Its quite a small subscription, low traffic. Maybe 1-2 TPS.
The double refreshing is interesting and I'm looking into that now. Perhaps I've somehow doubled up the stream and the 'second' stream is writing to the closed connection, triggering the crash.
That would potentially explain logs like this
writeLoop terminated
"java.io.IOException: Broken pipe
at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:60)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishWrite(UnixAsynchronousSocketChannelImpl.java:602)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:194)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:217)
at java.base/sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:306)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
at delay @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.go$2$$anonfun$1(SocketPlatform.scala:131)
at async @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.go$2(SocketPlatform.scala:131)
at flatMap @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.go$2(SocketPlatform.scala:135)
at delay @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.write(SocketPlatform.scala:138)
at flatMap @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.write(SocketPlatform.scala:138)
at getAndSet @ org.typelevel.keypool.KeyPool$.destroy(KeyPool.scala:120)
at deferred @ fs2.internal.InterruptContext$.apply$$anonfun$1(InterruptContext.scala:114)
"
Not sure if its related but this issue mentioned using client.stream
instead of client.run
, but don't think it quite matches the situation I have.
The bug appears when Pubsub sends a GoAway(max_age) after 1 hour.
This consistently occurs at T+1h
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (0, Map())
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (0, Map())
Refreshed GCP Pubsub Token with duration 3595 seconds
Refreshed GCP Pubsub Token with duration 3595 seconds
pubsub.googleapis.com:443 Read - Headers(identifier=691, dependency=None, endStream=false, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Read - Data(identifier=691, data=[REDACTED], pad=None, endStream=false)
pubsub.googleapis.com:443 Read - Headers(identifier=691, dependency=None, endStream=true, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Read - Ping
pubsub.googleapis.com:443 Write - Ping.Ack
pubsub.googleapis.com:443 Write - Headers(identifier=693, dependency=None, endStream=false, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Write - Data(identifier=693, data=[REDACTED])
pubsub.googleapis.com:443 Read - GoAway(identifier=0, lastStreamId=891, errorCode=NoError, additionalDebugData=[REDACTED])
pubsub.googleapis.com:443 Read - Ping
pubsub.googleapis.com:443 Write - Ping.Ack
pubsub.googleapis.com:443 Write - Ping.Ack
pubsub.googleapis.com:443 Read - GoAway(identifier=0, lastStreamId=693, errorCode=EnhanceYourCalm, additionalDebugData=[REDACTED])
Request method=POST uri=https://pubsub.googleapis.com/[REDACTED] headers=[REDACTED] threw an exception on attempt #1. Giving up
Connection pubsub.googleapis.com:443 readLoop Terminated with empty
writeLoop terminated
Restarting stream after [REDACTED] nanoseconds as it failed, 665 restart(s) left out of 666.
pubsub.googleapis.com:443 Write - Settings(SettingsEnablePush(false))
Note that double Ping.Ack leading to GoAway(EnhanceYourCalm / too_many_pings)
This consistently occurs at T+2h
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (®, Map())
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (0, Map())
Refreshed GCP Pubsub Token with duration 3595 seconds
Refreshed GCP Pubsub Token with duration 3595 seconds
pubsub.googleapis.com:443 Read - Headers(identifier=409, dependency=None, endStream=false, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Read - Data(identifier=409, data=[REDACTED], pad=None, endStream=false)
pubsub.googleapis.com:443 Read - Headers(identifier=409, dependency=None, endStream=true, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Read - Ping
pubsub.googleapis.com:443 Write - Ping-Ack
pubsub.googleapis.com:443 Write - Headers(identifier=411, dependency=None, endStream=false, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Write - Data(identifier=411, data=[REDACTED])
pubsub.googleapis.com:443 Read - Headers(identifier=411, dependency=None, endStream=false, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Read - Data(identifier=411, data=[REDACTED], pad=None, endStream=false)
pubsub.googleapis.com:443 Read - Headers(identifier=411, dependency=None, endStream=true, endHeaders=true, headerBlock=[REDACTED], padding=None)
pubsub.googleapis.com:443 Read - Ping
pubsub.googleapis.com:443 Read - GoAway(identifier=0, lastStreamId=611, errorCode=NoError, additionalDebugData=[REDACTED])
pubsub.googleapis.com:443 Read - Ping
pubsub.googleapis.com:443 Write - Ping.Ack
pubsub.googleapis.com:443 Write - Ping.Ack
pubsub.googleapis.com:443 Write - Ping.Ack
pubsub.googleapis.com:443 Read - GoAway(identifier=0, lastStreamId=411, errorCode=EnhanceYourCalm, additionalDebugData=[REDACTED])
Connection pubsub.googleapis.com:443 readLoop Terminated with empty
writeLoop terminated
Shutting Down Connection - RequestKey: [REDACTED]
Shutting Down Connection - RequestKey: [REDACTED]
[... + 1h ...]
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (0,Map())
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (0, Map())
Refreshed GCP Pubsub Token with duration 3595 seconds
Refreshed GCP Pubsub Token with duration 3595 seconds
Shutting Down Connection - RequestKey: [REDACTED]
Shutting Down Connection - RequestKey: [REDACTED]
[... + 1h ...]
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (0, Map())
Created Connection - RequestKey: [REDACTED]
Connection Taken - Key: [REDACTED] - Reused: false - PoolState: (0, Map())
Refreshed GCP Pubsub Token with duration 3595 seconds
Refreshed GCP Pubsub Token with duration 3595 seconds
Shutting Down Connection - RequestKey: [REDACTED]
Shutting Down Connection - RequestKey: [REDACTED]
[... loops token refresh every 1h ... ]
Unsure if the way I'm creating the client has any impact?
for {
client <- EmberClientBuilder
.default[F]
.withHttp2
.build
.mproduct(client => TokenProvider.serviceAccount(client).pure[F].toResource)
.map { case (client, tokenProvider) => tokenProvider.clientMiddleware(client) }
} yield {
given Client[F] = client
PubSubSubscriber(config, subscription, deserializer, logger)
}
I am struggling to explain the double and then triple Ping.ack. It suggests there is a duplicating of clients/connections that is compounding with each Stream start.
I don't suppose you have any example code of projects using this package I could compare against?
Thanks for all your help so far by the way, really appreciated.
The only difference I see is you're using HTTP2. Does it still happen if you don't use it? The double or triple pings could be explained by your concurrency settings. How are you creating the subscriber?
Read concurrency is set to 1.
Subscriber is created like this
PubSubSubscriber
.http[F]
.projectId(config.subscriber.projectId)
.subscription(Subscription(subscription))
.uri(config.subscriber.uri)
.httpClient(client)
.noRetry
.errorHandler {
case (_, t) => t.raiseError
}
.batchSize(config.subscriber.batchSize) // 100
.maxLatency(config.subscriber.maxLatency) // 10 seconds
.readMaxMessages(config.subscriber.readMaxMessages) // 1000
.readConcurrency(config.subscriber.readConcurrency) // 1
.raw
I'm trying to see if I can put together a simple reproducer as well.
Over the weekend I left a build running with a http subscriber and http2 client. Out of 4 pods 3 had the error occur after 12 hours (all 3 at T+12hr) 1 pod ran fine all weekend.
Just deployed a build using http subscriber and http 1 client. Will let you know how it goes.
Switching to http1 seems to have resolved the issue by the way 🎉
When I get some time I'll try and code up a reproducer for the http2 bug
Nice! Yeah, that would be super-useful in order to open an issue on http4s
We're a bit stuck on some weird behaviour in our application.
We're using fs2-pubsub, with EmberClientwith http2 support to make gRPC requests to Google pubsub api.
I.e. Setup something like this
We're seeing periodic buildup of un-acked messages
A newly started pod behaves properly for a while, but after 1-2 hours we see these logs
GoAway
HTTP2 frames indicate the server intends to close the connection.The first
GoAway
additionalDebugData decodes tomax_age
. The second GoAway message's additionalDebugData decodes totoo_many_pings
. Notice the two back-to-back Ping.Ack which triggers the GoAway: EnhanceYourCalmFollowing these logs that pod will no longer process any new pubsub messages
We have (unsuccessfully) tried many things to fix this:
We're a bit confused, but it appears the issue may lie within the interaction of fs2-pubsub and http4s' EmberClient and how it handles HTTP2 connection lifecycle management.