nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.49k stars 1.38k forks source link

nats-server has memory leak for queue group subscriptions #2516

Closed andrey-shalamov closed 2 years ago

andrey-shalamov commented 2 years ago

Defect

nats-server has memory leak for queue group subscriptions

Versions of nats-server and affected client libraries used:

[1] 2021/09/13 13:21:20.541459 [INF] Starting nats-server
[1] 2021/09/13 13:21:20.542067 [INF]   Version:  2.5.0
[1] 2021/09/13 13:21:20.542088 [INF]   Git:      [b3c19b9]
[1] 2021/09/13 13:21:20.542093 [DBG]   Go build: go1.16.8
[1] 2021/09/13 13:21:20.542096 [INF]   Name:     NBB42VDQKNGFZSRPOB7F3UNTP66TVHJKRGVXCJY5KAJWQCXQWTHVVDEN
[1] 2021/09/13 13:21:20.542107 [INF]   Node:     v5AjBWvn
[1] 2021/09/13 13:21:20.542112 [INF]   ID:       NBB42VDQKNGFZSRPOB7F3UNTP66TVHJKRGVXCJY5KAJWQCXQWTHVVDEN
[1] 2021/09/13 13:21:20.542138 [DBG] Created system account: "$SYS"
[1] 2021/09/13 13:21:20.545144 [INF] Starting JetStream
[1] 2021/09/13 13:21:20.548120 [DBG] JetStream creating dynamic configuration - 5.83 GB memory, 171.23 GB disk
[1] 2021/09/13 13:21:20.548421 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[1] 2021/09/13 13:21:20.548555 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[1] 2021/09/13 13:21:20.548692 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[1] 2021/09/13 13:21:20.548700 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[1] 2021/09/13 13:21:20.548702 [INF]
[1] 2021/09/13 13:21:20.548705 [INF]          https://docs.nats.io/jetstream
[1] 2021/09/13 13:21:20.548706 [INF]
[1] 2021/09/13 13:21:20.548709 [INF] ---------------- JETSTREAM ----------------
[1] 2021/09/13 13:21:20.548716 [INF]   Max Memory:      5.83 GB
[1] 2021/09/13 13:21:20.548719 [INF]   Max Storage:     171.23 GB
[1] 2021/09/13 13:21:20.548722 [INF]   Store Directory: "/tmp/nats/jetstream"
[1] 2021/09/13 13:21:20.548724 [INF] -------------------------------------------
[1] 2021/09/13 13:21:20.548848 [DBG]   Exports:
[1] 2021/09/13 13:21:20.548858 [DBG]      $JS.API.>
[1] 2021/09/13 13:21:20.548897 [DBG] Enabled JetStream for account "$G"
[1] 2021/09/13 13:21:20.548928 [DBG]   Max Memory:      -1 B
[1] 2021/09/13 13:21:20.548932 [DBG]   Max Storage:     -1 B
[1] 2021/09/13 13:21:20.549117 [DBG] JetStream state for account "$G" recovered
[1] 2021/09/13 13:21:20.549336 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/09/13 13:21:20.549367 [DBG] Get non local IPs for "0.0.0.0"
[1] 2021/09/13 13:21:20.549561 [DBG]   ip=172.17.0.3
[1] 2021/09/13 13:21:20.549632 [INF] Server is ready

OS/Container environment:

nats:alpine

Steps or code to reproduce the issue:

use next code

package main

import (
    "sync"

    "github.com/nats-io/nats.go"
)

const (
    streamName     = "STREAM"
    streamSubject  = "STREAM.*"
    durableName    = "durable"
    queueGroupName = "queue"
    consumerName   = "consumer"
    subjectName    = "STREAM.foo"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    check(err)
    defer nc.Close()

    js, err := nc.JetStream()
    check(err)
    stream, _ := js.StreamInfo(streamName)
    if stream == nil {
        stream, err = js.AddStream(&nats.StreamConfig{
            Name:     streamName,
            Subjects: []string{streamSubject},
        })
        check(err)
    }
    ci, _ := js.ConsumerInfo(streamName, consumerName)
    if ci == nil {
        ci, err = js.AddConsumer(streamName, &nats.ConsumerConfig{
            AckPolicy:      nats.AckExplicitPolicy,
            Durable:        durableName,
            DeliverSubject: queueGroupName,
            DeliverGroup:   queueGroupName,
            DeliverPolicy:  nats.DeliverNewPolicy,
        })
        check(err)
    }
    wg := sync.WaitGroup{}
    wg.Add(1)
    subOpts := []nats.SubOpt{
        nats.ManualAck(),
        nats.Durable(durableName),
        nats.DeliverSubject(queueGroupName),
        nats.DeliverNew(),
    }
    _, err = js.QueueSubscribe(subjectName, queueGroupName, func(m *nats.Msg) {
        check(m.Ack())
        wg.Done()
    },
        subOpts...)
    check(err)

    _, err = js.Publish(subjectName, []byte("hello"))
    check(err)

    wg.Wait()
}

func check(err error) {
    if err != nil {
        panic(err)
    }
}

and bash script (test.sh)

#!/bin/bash

i=0
while [ "$i" -le "$1" ]; do
    ./nats-leak
    i=$(( i + 1 ))
done

build app

go build -o nats-leak

run script

sh test.sh 30

Expected result:

no memory leak

Actual result:

image

kozlovic commented 2 years ago

Closed by #2517