yarpc / yarpc-go

A message passing platform for Go
MIT License
401 stars 101 forks source link

Unclean shutdown when using single-peer with gRPC #2241

Open Groxx opened 6 months ago

Groxx commented 6 months ago

When converting to some zaptest loggers in some internal tests, I started getting occasional test panics like:

panic: Log in goroutine after TestName has completed: 2024-01-02T19:16:29.853Z DEBUG   peer status change  {"status": "Unavailable", "peer": "127.0.0.1:1234", "transport": "grpc"}
goroutine 239 [running]:
testing.(*common).logDepth(0xc0007dc000, {0xc00021f500, 0x7a}, 0x3)
    GOROOT/src/testing/testing.go:1022 +0x4c5
testing.(*common).log(...)
    GOROOT/src/testing/testing.go:1004
testing.(*common).Logf(0xc0007dc000, {0x6aa777?, 0x18e30c5?}, {0xc0001ad3f0?, 0x32fa00?, 0x1?})
    GOROOT/src/testing/testing.go:1055 +0x54
go.uber.org/zap/zaptest.testingWriter.Write({{0x974b18?, 0xc0007dc000?}, 0xaa?}, {0xc0008f2c00?, 0x7b, 0xc0001ad3e0?})
    external/org_uber_go_zap/zaptest/logger.go:130 +0xdc
go.uber.org/zap/zapcore.(*ioCore).Write(0xc000708780, {0xff, {0xc15d362372e443f0, 0x687c687, 0x326cf60}, {0x0, 0x0}, {0x6c2592, 0x12}, {0x0, ...}, ...}, ...)
    external/org_uber_go_zap/zapcore/core.go:99 +0xb5
go.uber.org/zap/zapcore.(*CheckedEntry).Write(0xc0000280d0, {0xc0002a26c0, 0x3, 0x3})
    external/org_uber_go_zap/zapcore/entry.go:253 +0x1dc
go.uber.org/zap.(*Logger).Debug(0x0?, {0x6c2592?, 0xc0001a0640?}, {0xc0002a26c0, 0x3, 0x3})
    external/org_uber_go_zap/logger.go:238 +0x51
go.uber.org/yarpc/transport/grpc.(*grpcPeer).setConnectionStatus(0xc0007309c0, 0x0)
    external/org_uber_go_yarpc/transport/grpc/peer.go:98 +0x272
go.uber.org/yarpc/transport/grpc.(*grpcPeer).monitorConnectionStatus(0xc0007309c0)
    external/org_uber_go_yarpc/transport/grpc/peer.go:90 +0x95
created by go.uber.org/yarpc/transport/grpc.(*Transport).newPeer in goroutine 236
    external/org_uber_go_yarpc/transport/grpc/peer.go:73 +0x6cb

After digging around a bit, I can see we are using some single-peer-choosers with grpc, and:

So if shutdown calls peer.Single.Stop() and then grpc.Transport.Stop(), the peer will be removed after having only been told to stop(), and the transport's Stop() will not wait for it to stop its background goroutine.
I'm not 100% certain that shutdown occurs in this order (fx logs don't make that explicit), but it seems like it probably has to as peers are used in outbounds. Stop RPC == stop outbounds -> stop peers -> stop transports, right?


I'm not seeing any way to patch this from the outside, as the peer's instance and API doesn't seem to be exposed anywhere. Which is probably a good thing. So I think this has to be fixed internally.

As a possibly simple option: maybe grpc.Transport should just keep all stop-chans (remove the peer but not the chan in ReleasePeer) and wait on all of them during Stop()? It would leak empty chans unless some cleanup process was run, but if that's an issue then closed chans could probably be cleared out in ReleasePeer as a garbage collector.
Or should ReleasePeer just wait too? I'm not sure what the semantics are here, but it seems like it may be intentional that it doesn't wait.

I haven't carefully checked the other transports to see if they have similar issues, but e.g. http is sufficiently different that it doesn't obviously have the same problem.

Groxx commented 6 months ago

Yea, applying this patch brings me from ~20% failure to zero after thousands of iterations:

diff --git a/transport/grpc/transport.go b/transport/grpc/transport.go
index 15d69460..2c7a2c32 100644
--- a/transport/grpc/transport.go
+++ b/transport/grpc/transport.go
@@ -39,6 +39,7 @@ type Transport struct {
    once          *lifecycle.Once
    options       *transportOptions
    addressToPeer map[string]*grpcPeer
+   waitPeers     []*grpcPeer
 }

 // NewTransport returns a new Transport.
@@ -71,6 +72,9 @@ func (t *Transport) Stop() error {
        for _, grpcPeer := range t.addressToPeer {
            grpcPeer.wait()
        }
+       for _, stoppedGrpcPeer := range t.waitPeers {
+           stoppedGrpcPeer.wait()
+       }
        return nil
    })
 }
@@ -144,6 +148,7 @@ func (t *Transport) ReleasePeer(pid peer.Identifier, ps peer.Subscriber) error {
    if p.NumSubscribers() == 0 {
        delete(t.addressToPeer, address)
        p.stop()
+       t.waitPeers = append(t.waitPeers, p)
    }
    return nil
 }

I have no idea if this^ is worth using, I'm not familiar enough with the code/expectations in here. But it's an effective proof of concept at least.