nats-io / stan.go

NATS Streaming System
https://nats.io
Apache License 2.0
706 stars 117 forks source link

Memory leak when subscribe fails #365

Closed oncicaradupopovici closed 2 years ago

oncicaradupopovici commented 2 years ago

When subscribe fails the subscription object is not removed from the subMap generating memory leaks. The problem is worse than leaking just a few sub objects when the provided subscription cb has closures over other ressources. In our case, the subscription cb has a closure over some grpc streams, therefore, when stan subscription times out, the whole stream object graph is retained.

D:\>go tool pprof http://localhost:6060/debug/pprof/heap
Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in C:\Users\rpopovici\pprof\pprof.alloc_objects.alloc_space.inuse_objects.inuse_space.020.pb.gz
Type: inuse_space
Time: Jul 28, 2022 at 12:03pm (EEST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 208.49MB, 73.35% of 284.25MB total
Dropped 29 nodes (cum <= 1.42MB)
Showing top 10 nodes out of 91
      flat  flat%   sum%        cum   cum%
   81.52MB 28.68% 28.68%   126.02MB 44.33%  google.golang.org/grpc/internal/transport.(*http2Server).operateHeaders
   18.50MB  6.51% 35.19%    18.50MB  6.51%  golang.org/x/net/http2/hpack.(*Decoder).readString
   18.46MB  6.49% 41.68%    40.71MB 14.32%  github.com/nats-io/stan%2ego.(*conn).subscribe
   15.50MB  5.45% 47.13%    15.50MB  5.45%  context.WithValue
   14.50MB  5.10% 52.24%   112.72MB 39.66%  google.golang.org/grpc.(*Server).processStreamingRPC
      14MB  4.93% 57.16%       14MB  4.93%  google.golang.org/grpc/internal/transport.newRecvBuffer (inline)
   12.50MB  4.40% 61.56%    12.50MB  4.40%  runtime.malg
      12MB  4.22% 65.78%    17.50MB  6.16%  github.com/nats-io/nats%2ego.(*Conn).subscribeLocked
   11.50MB  4.05% 69.83%    11.50MB  4.05%  google.golang.org/grpc/internal/transport.newWriteQuota (inline)
      10MB  3.52% 73.35%       10MB  3.52%  context.WithCancel
(pprof) list stan%2ego.\(\*conn\).subscribe
Total: 479.28MB
ROUTINE ======================== github.com/nats-io/stan%2ego.(*conn).subscribe in C:\Users\rpopovici\go\pkg\mod\github.com\nats-io\stan.go@v0.10.2\sub.go
   30.87MB    64.62MB (flat, cum) 13.48% of Total
         .          .    235:   return sc.subscribe(subject, qgroup, cb, options...)
         .          .    236:}
         .          .    237:
         .          .    238:// subscribe will perform a subscription with the given options to the NATS Streaming cluster.
         .          .    239:func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...SubscriptionOption) (Subscription, error) {
      19MB    21.50MB    240:   sub := &subscription{subject: subject, qgroup: qgroup, inbox: nats.NewInbox(), cb: cb, sc: sc, opts: DefaultSubscriptionOptions}
         .          .    241:   for _, opt := range options {
         .          .    242:           if err := opt(&sub.opts); err != nil {
         .          .    243:                   return nil, err
         .          .    244:           }
         .          .    245:   }
         .          .    246:   sc.Lock()
         .          .    247:   if sc.closed {
         .          .    248:           sc.Unlock()
         .          .    249:           return nil, ErrConnectionClosed
         .          .    250:   }
         .          .    251:
         .          .    252:   // Register subscription.
   10.36MB    10.36MB    253:   sc.subMap[sub.inbox] = sub
         .          .    254:   sc.Unlock()
         .          .    255:
         .          .    256:   // Hold lock throughout.
         .          .    257:   sub.Lock()
         .          .    258:   defer sub.Unlock()
         .          .    259:
         .          .    260:   // sc.nc is immutable and never nil once connection is created.
         .          .    261:
         .          .    262:   // Listen for actual messages.
    1.50MB    28.50MB    263:   nsub, err := sc.nc.Subscribe(sub.inbox, sc.processMsg)
         .          .    264:   if err != nil {
         .          .    265:           return nil, err
         .          .    266:   }
         .          .    267:   nsub.SetPendingLimits(-1, -1)
         .          .    268:   sub.inboxSub = nsub
         .          .    269: