streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Data race while closing connection after publishing #428

Open pardhasm opened 5 years ago

pardhasm commented 5 years ago

I am publishing messages to rabbitMQ at a small rate (30/s). If I close the connection after the message publishes, it is giving me the below error.

fatal error: concurrent map read and map write

goroutine 10 [running]:
runtime.throw(0x18e7d4f, 0x21)
        /usr/local/Cellar/go/1.13.1/libexec/src/runtime/panic.go:774 +0x72 fp=0xc00005fe28 sp=0xc00005fdf8 pc=0x102fcc2
runtime.mapaccess1(0x17be000, 0xc0000c4690, 0xc00005fe9e, 0x1)
        /usr/local/Cellar/go/1.13.1/libexec/src/runtime/map.go:411 +0x269 fp=0xc00005fe70 sp=0xc00005fe28 pc=0x100f889
github.com/streadway/amqp.(*Connection).dispatchN(0xc0002c8000, 0x1a0ac80, 0xc0004c0f60)
        /Users/pardhasaradhi/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20190827072141-edfb9018d271/connection.go:473 +0x8c fp=0xc00005fec0 sp=0xc00005fe70 pc=0x16ed13c
github.com/streadway/amqp.(*Connection).demux(0xc0002c8000, 0x1a0ac80, 0xc0004c0f60)
        /Users/pardhasaradhi/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20190827072141-edfb9018d271/connection.go:436 +0x87 fp=0xc00005fee8 sp=0xc00005fec0 pc=0x16ecc57
github.com/streadway/amqp.(*Connection).reader(0xc0002c8000, 0x2419078, 0xc000010080)
        /Users/pardhasaradhi/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20190827072141-edfb9018d271/connection.go:528 +0xf4 fp=0xc00005ffc8 sp=0xc00005fee8 pc=0x16ed414
runtime.goexit()
        /usr/local/Cellar/go/1.13.1/libexec/src/runtime/asm_amd64.s:1357 +0x1 fp=0xc00005ffd0 sp=0xc00005ffc8 pc=0x105c891
created by github.com/streadway/amqp.Open
        /Users/pardhasaradhi/go/pkg/mod/github.com/streadway/amqp@v0.0.0-20190827072141-edfb9018d271/connection.go:233 +0x260

My code:

func Publish(name string, body []byte) {
    channel, err := connection.Channel()
    defer channel.Close() // commenting out this does not cause any errors but too many channels are left.
    err := channel.Publish("", name, false, false, amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         body,
    })
    util.HandlerError(err, "Could not publish message for mq : "+name)
}