envoyproxy / go-control-plane

Go implementation of data-plane-api
Apache License 2.0
1.48k stars 505 forks source link

Goroutine leak in DeltaStreamHandler #913

Closed zhiyanfoo closed 2 months ago

zhiyanfoo commented 2 months ago

go-control-plane version: 2259f2656a1f9c0622c315e58ad1e93338373ba5

In our control-plane implementation, we are seeing a significant memory leak in specific cases. Taking a closer look the memory leak can be attributed with a goroutine leak.

Screenshot 2024-04-04 at 2 20 25 PM

Looking at profiles we see that we have far too many goroutines from the DeltaStreamHandler method (significantly more than active streams).

This is currently occurring when using the delta protocol with ads from an envoy client, targeting a snapshot cache.

The following goroutine has the potential to leak when s.processDelta returns with an error.

Here is an example of how the goroutine leaks.

  1. Two or more requests are made on the same stream (as in the case when using ADS)
  2. The first request is sent to reqCh and read from the channel here.
  3. The goroutine for-loop gets past and is blocked on the request being sent on the (unbuffered) channel.
  4. While processing the first request, an error is returned (e.g. from a callback) and the processDelta method is exited, stopping any processing of the request channel. For example you could return an error here.
  5. The goroutine is still blocked on the channel sending. here. Because processDelta has returned, there will not be a read from reqCh and the goroutine ends up leaking.

A possible fix would be the following

for {
    req, err := str.Recv()
    if err != nil {
            close(reqCh)
            return
    }

    select {
    case reqCh <- req:
    case <-str.Context().Done():
            close(reqCh)
            return
    }
}

this relies on the fact if processDelta does return an error it implies that the stream would be closed, and so we can rely on str.Context().Done() to run in this scenario.

This is similar to what is implemented for the STOW stream handler. (We could also use this implementation instead)

    go func() {
        defer close(reqCh)
        for {
            req, err := stream.Recv()
            if err != nil {
                return
            }
            select {
            case reqCh <- req:
            case <-stream.Context().Done():
                return
            case <-s.ctx.Done():
                return
            }
        }
    }()