streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.87k stars 620 forks source link

Connection's Heartbeat Is A Goroutine Leak (v1.0.0) #457

Closed houseofcat closed 4 years ago

houseofcat commented 4 years ago

Go routine on all Channels/Connections closed, failing goroutine leak test on heartbeat.

--- FAIL: TestCreatePublisherAndPublishWithConfirmation (5.27s)
    leaktest.go:132: leaktest: timed out checking goroutines
    leaktest.go:150: leaktest: leaked goroutine: goroutine 31 [select]:
        github.com/streadway/amqp.(*Connection).heartbeater(0xc0002948c0, 0x12a05f200, 0xc000086420)
            c:/go/pkg/mod/github.com/streadway/amqp@v1.0.0/connection.go:551 +0x260
        created by github.com/streadway/amqp.(*Connection).openTune
            c:/go/pkg/mod/github.com/streadway/amqp@v1.0.0/connection.go:782 +0x83f
    leaktest.go:150: leaktest: leaked goroutine: goroutine 85 [IO wait]:
        internal/poll.runtime_pollWait(0x26dbc958, 0x72, 0xc00014ac98)
            c:/go/src/runtime/netpoll.go:203 +0x65
        internal/poll.(*pollDesc).wait(0xc00014ae48, 0x72, 0xd84300, 0x0, 0x0)
            c:/go/src/internal/poll/fd_poll_runtime.go:87 +0xa2
        internal/poll.(*ioSrv).ExecIO(0xdc8220, 0xc00014ac98, 0xa54930, 0x0, 0x0, 0x0)
            c:/go/src/internal/poll/fd_windows.go:228 +0x2cd
        internal/poll.(*FD).Read(0xc00014ac80, 0xc000235000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
            c:/go/src/internal/poll/fd_windows.go:527 +0x53b
        net.(*netFD).Read(0xc00014ac80, 0xc000235000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
            c:/go/src/net/fd_windows.go:152 +0x94
        net.(*conn).Read(0xc000148098, 0xc000235000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
            c:/go/src/net/net.go:184 +0xbc
        bufio.(*Reader).Read(0xc0000861e0, 0xc00006a002, 0x7, 0x7, 0x7, 0x0, 0x0)
            c:/go/src/bufio/bufio.go:226 +0x409
        io.ReadAtLeast(0xabcb20, 0xc0000861e0, 0xc00006a002, 0x7, 0x7, 0x7, 0x0, 0x0, 0x0)
            c:/go/src/io/io.go:310 +0x194
        io.ReadFull(0xabcb20, 0xc0000861e0, 0xc00006a002, 0x7, 0x7, 0x0, 0x0, 0x0)
            c:/go/src/io/io.go:329 +0xb1
        github.com/streadway/amqp.(*reader).ReadFrame(0xc000255f78, 0x0, 0x0, 0x0, 0x0)
            c:/go/pkg/mod/github.com/streadway/amqp@v1.0.0/read.go:49 +0xfa
        github.com/streadway/amqp.(*Connection).reader(0xc0002948c0, 0x27290038, 0xc000148098)
            c:/go/pkg/mod/github.com/streadway/amqp@v1.0.0/connection.go:521 +0x147
        created by github.com/streadway/amqp.Open
            c:/go/pkg/mod/github.com/streadway/amqp@v1.0.0/connection.go:233 +0x3a2
FAIL

Example Unit Test.

func TestCreatePublisherAndPublishWithConfirmation(t *testing.T) {
    defer leaktest.Check(t)() // Fail on leaked goroutines.

    channelPool, err := pools.NewChannelPool(Seasoning.PoolConfig, nil, true)
    assert.NoError(t, err)

    channelPool.FlushErrors()

    publisher, err := publisher.NewPublisher(Seasoning, channelPool, nil)
    assert.NoError(t, err)
    assert.NotNil(t, publisher)

    letterID := uint64(1)
    body := "\xFF\xFF\x89\xFF\xFF"
    envelope := &models.Envelope{
        Exchange:    "",
        RoutingKey:  "ConfirmationTestQueue",
        ContentType: "plain/text",
        Mandatory:   false,
        Immediate:   false,
    }

    letter := &models.Letter{
        LetterID:   letterID,
        RetryCount: uint32(3),
        Body:       []byte(body),
        Envelope:   envelope,
    }

    publisher.PublishWithConfirmation(letter)
    channelPool.Shutdown()

    // Assert on all Notifications
AssertLoop:
    for {
        select {
        case chanErr := <-channelPool.Errors():
            assert.NoError(t, chanErr) // This test fails on channel errors.
            break AssertLoop
        case notification := <-publisher.Notifications():
            assert.True(t, notification.Success)
            assert.Equal(t, letterID, notification.LetterID)
            assert.NoError(t, notification.Error)
            break AssertLoop
        default:
            time.Sleep(100 * time.Millisecond)
        }
    }

    time.Sleep(time.Millisecond * 100)
}

Link to Unit Test in Github https://github.com/houseofcat/turbocookedrabbit/blob/master/publisher/publisher_test.go

I know I am not directly using a "Connection" or "Channel", trust me that on shutdown every channel and connection's close is called.

@streadway or @michaelklishin let me know if you need help with this.

Additional details RabbitMQ Server 3.8.3, Erlang 22.3

houseofcat commented 4 years ago

Oddly enough, identified the issue with Leaktest utility I am using (parallel tests). Leaving it here in case others find the same problem.

Sorry!