streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

panic: send on closed channel on broker disconnect #435

Closed Pitasi closed 4 years ago

Pitasi commented 4 years ago

Hey there. Thanks for this library! I'm having troubles implementing reconnection when the broker can no longer be reached - it kinda works 50% of the times, the other 50% I have this panic:

panic: send on closed channel

goroutine 116 [running]:
github.com/streadway/amqp.(*Connection).shutdown.func1()
        /home/antonio/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20200108173154-1c71cc93ed71/connection.go:395 +0x5bd
sync.(*Once).doSlow(0xc0000ec280, 0xc000108da8)
        /usr/lib/go/src/sync/once.go:66 +0x101
sync.(*Once).Do(0xc0000ec280, 0xc000108da8)
        /usr/lib/go/src/sync/once.go:57 +0x69
github.com/streadway/amqp.(*Connection).shutdown(0xc0000ec280, 0xc00000ea80)
        /home/antonio/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20200108173154-1c71cc93ed71/connection.go:389 +0x92
github.com/streadway/amqp.(*Connection).dispatch0(0xc0000ec280, 0x7ec940, 0xc00000ea20)
        /home/antonio/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20200108173154-1c71cc93ed71/connection.go:451 +0x531
github.com/streadway/amqp.(*Connection).demux(0xc0000ec280, 0x7ec940, 0xc00000ea20)
        /home/antonio/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20200108173154-1c71cc93ed71/connection.go:434 +0x6d
github.com/streadway/amqp.(*Connection).reader(0xc0000ec280, 0x7f40502dcfe8, 0xc000010090)
        /home/antonio/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20200108173154-1c71cc93ed71/connection.go:528 +0x10a
created by github.com/streadway/amqp.Open
        /home/antonio/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20200108173154-1c71cc93ed71/connection.go:233 +0x3eb
exit status 2

What I'm doing is basically have a goroutine ranging over a chan *amqp.Error and every time it receives something it re-initializes the connection, call NotifyClose() on the same chan, then re-init channel, exchange, queues, etc.

From the doc:

NotifyClose registers a listener for close events either initiated by an error accompanying a connection.close method or by a normal shutdown. On normal shutdowns, the chan will be closed.

apparently the chan sometimes is closed even on non-normal shutdowns.

Pitasi commented 4 years ago

I was probably doing something weird with channels, I basically rewrote my logic being inspired by this code: https://github.com/isayme/go-amqp-reconnect/blob/master/rabbitmq/rabbitmq.go and the error is gone.

Leaving this as a reference for the future :smile: