streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Potential goroutine leak #378

Closed dolftax closed 5 years ago

dolftax commented 5 years ago

pprof info

20 @ 0x42febb 0x43febd 0x863fc7 0x45df91
#   0x863fc6    github.com/foo/bar/vendor/github.com/streadway/amqp.(*Connection).heartbeater+0x186 /go/src/github.com/foo/bar/vendor/github.com/streadway/amqp/connection.go:546
lukebakken commented 5 years ago

This isn't nearly enough information. To start, code to reproduce this would be helpful to have.

dolftax commented 5 years ago

Hi @lukebakken

Sorry for the delay. Following is a sample code that is causing the issue.

    // Open channel
    rmqRepoRunChan, err := rmqConn.Channel()
    if err != nil {
        raven.CaptureErrorAndWait(err, nil)
        log.Fatalln("Error opening channel repo-run: ", err)
    }

    defer rmqRepoRunChan.Close()

    // Declare repo run queue
    repoRunQueueDeclare, repoRunQueueDeclareErr := rmqRepoRunChan.QueueDeclare(
        "repo-run", // Queue name
        false,      // Durable
        true,       // Delete when used
        false,      // Exclusive
        false,      // No wait
        nil,        // Arguments

    )
    if repoRunQueueDeclareErr != nil {
        raven.CaptureErrorAndWait(err, nil)
        log.Fatalln("Error declaring queue: repo-run", repoRunQueueDeclareErr)
    }

    // Bind repo run queue to atlas-jobs exchange
    repoRunQueueBindErr := rmqRepoRunChan.QueueBind(
        "repo-run",                          // Queue name
        "repo-run",                          // Routing key
        viper.GetString("app.rmq.exchange"), // Exchange name
        false,                               // No wait
        nil,                                 // Arguments
    )
    if repoRunQueueBindErr != nil {
        raven.CaptureErrorAndWait(err, nil)
        log.Fatalln("Error binding queue: repo-run", repoRunQueueBindErr)
    }

    // Listen for messages to consume from repo-run queue
    repoRunMessages, repoRunConsumeErr := rmqRepoRunChan.Consume(
        repoRunQueueDeclare.Name, // Queue name
        "",                       // Consumer
        true,                     // Auto ackowledge
        false,                    // Exclusive
        false,                    // No local
        false,                    // No wait
        nil,                      // Arguments
    )
    if repoRunConsumeErr != nil {
        raven.CaptureErrorAndWait(err, nil)
        log.Fatalln("Error registering consumer for queue: repo-run", repoRunConsumeErr)
    }

    // Listen for messages on repo-run queue
    go func() {
        for message := range repoRunMessages {
            run := at.RepoRun{}
            body := bytes.NewReader(message.Body)
            json.NewDecoder(body).Decode(&run)

            log.Printf("Repo run received %v \n", run)
        }
    }()

    log.Println("Consumers initialized for queue: repo-run")

Goroutine trace:

==== BEFORE a message is consumed ====

goroutine profile: total 34
7 @ 0x42ff7b 0x42b2a9 0x42a956 0x4981da 0x4982ed 0x499039 0x53c10f 0x54f798 0x56bbef 0x4755f8 0x475798 0x869b0c 0x866608 0x45e161
#   0x42a955    internal/poll.runtime_pollWait+0x65                             /usr/local/go/src/runtime/netpoll.go:173
#   0x4981d9    internal/poll.(*pollDesc).wait+0x99                             /usr/local/go/src/internal/poll/fd_poll_runtime.go:85
#   0x4982ec    internal/poll.(*pollDesc).waitRead+0x3c                             /usr/local/go/src/internal/poll/fd_poll_runtime.go:90
#   0x499038    internal/poll.(*FD).Read+0x178                                  /usr/local/go/src/internal/poll/fd_unix.go:169
#   0x53c10e    net.(*netFD).Read+0x4e                                      /usr/local/go/src/net/fd_unix.go:202
#   0x54f797    net.(*conn).Read+0x67                                       /usr/local/go/src/net/net.go:177
#   0x56bbee    bufio.(*Reader).Read+0x22e                                  /usr/local/go/src/bufio/bufio.go:216
#   0x4755f7    io.ReadAtLeast+0x87                                     /usr/local/go/src/io/io.go:310
#   0x475797    io.ReadFull+0x57                                        /usr/local/go/src/io/io.go:329
#   0x869b0b    github.com/foo/bar/vendor/github.com/streadway/amqp.(*reader).ReadFrame+0x6b    /go/src/github.com/foo/bar/vendor/github.com/streadway/amqp/read.go:49
#   0x866607    github.com/foo/bar/vendor/github.com/streadway/amqp.(*Connection).reader+0xc7   /go/src/github.com/foo/bar/vendor/github.com/streadway/amqp/connection.go:516

7 @ 0x42ff7b 0x43fff6 0x8669e7 0x45e161
#   0x8669e6    github.com/foo/bar/vendor/github.com/streadway/amqp.(*Connection).heartbeater+0x186 /go/src/github.com/foo/bar/vendor/github.com/streadway/amqp/connection.go:546

==== AFTER a message is consumed ====

goroutine profile: total 36
8 @ 0x42ff7b 0x42b2a9 0x42a956 0x4981da 0x4982ed 0x499039 0x53c10f 0x54f798 0x56bbef 0x4755f8 0x475798 0x869b0c 0x866608 0x45e161
#   0x42a955    internal/poll.runtime_pollWait+0x65                             /usr/local/go/src/runtime/netpoll.go:173
#   0x4981d9    internal/poll.(*pollDesc).wait+0x99                             /usr/local/go/src/internal/poll/fd_poll_runtime.go:85
#   0x4982ec    internal/poll.(*pollDesc).waitRead+0x3c                             /usr/local/go/src/internal/poll/fd_poll_runtime.go:90
#   0x499038    internal/poll.(*FD).Read+0x178                                  /usr/local/go/src/internal/poll/fd_unix.go:169
#   0x53c10e    net.(*netFD).Read+0x4e                                      /usr/local/go/src/net/fd_unix.go:202
#   0x54f797    net.(*conn).Read+0x67                                       /usr/local/go/src/net/net.go:177
#   0x56bbee    bufio.(*Reader).Read+0x22e                                  /usr/local/go/src/bufio/bufio.go:216
#   0x4755f7    io.ReadAtLeast+0x87                                     /usr/local/go/src/io/io.go:310
#   0x475797    io.ReadFull+0x57                                        /usr/local/go/src/io/io.go:329
#   0x869b0b    github.com/foo/bar/vendor/github.com/streadway/amqp.(*reader).ReadFrame+0x6b    /go/src/github.com/foo/bar/vendor/github.com/streadway/amqp/read.go:49
#   0x866607    github.com/foo/bar/vendor/github.com/streadway/amqp.(*Connection).reader+0xc7   /go/src/github.com/foo/bar/vendor/github.com/streadway/amqp/connection.go:516

8 @ 0x42ff7b 0x43fff6 0x8669e7 0x45e161
#   0x8669e6    github.com/foo/bar/vendor/github.com/streadway/amqp.(*Connection).heartbeater+0x186 /go/src/github.com/foo/bar/vendor/github.com/streadway/amqp/connection.go:546
michaelklishin commented 5 years ago

This is not evidence of a leak. To demonstrate a leak you have to run the code continuously and observe an ever growing number of goroutines. Your own code creates a goroutine, for example.

We are not aware of any outstanding issues with leaks so please provide a bit more evidence than this example.