streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Use atomic operation to prevent deadlock when publishing in confirm mode #488

Open RainJoe opened 3 years ago

RainJoe commented 3 years ago

When producer push message and confirm message in one goroutine, deadlock would occur. In confirms.go, Publish method might wait lock infinitely that One method held once confirm channel blocked. So I'd like to use atomic operation to avoid it.

michaelklishin commented 3 years ago

Hey folks,

I'm posting this on behalf of the core team.

As you have noticed, this client hasn't seen a lot of activity recently. Many users are unhappy about that and we fully recognize that it's a popular library that should be maintained more actively. There are also many community members who have contributed pull requests and haven't been merged for various reasons.

Because this client has a long tradition of "no breaking public API changes", certain reasonable changes will likely never be accepted. This is frustrating to those who have put in their time and effort into trying to improve this library.

We would like to thank @streadway for developing this client and maintaining it for a decade — that's a remarkable contribution to the RabbitMQ ecosystem. We this now is a good time to get more contributors involved.

Team RabbitMQ has adopted a "hard fork" of this client in order to give the community a place to evolve the API. Several RabbitMQ core team members will participate but we think it very much should be a community-driven effort.

What do we mean by "hard fork" and what does it mean for you? The entire history of the project is retained in the new repository but it is not a GitHub fork by design. The license remains the same 2-clause BSD. The contribution process won't change much (except that we hope to review and accept PRs reasonably quickly).

What does change is that this new fork will accept reasonable breaking API changes according to Semantic Versioning (or at least our understanding of it). At the moment the API is identical to that of streadway/amqp but the package name is different. We will begin reviewing PRs and merging them if they make sense in the upcoming weeks.

If your PR hasn't been accepted or reviewed, you are welcome to re-submit it for rabbitmq/amqp091-go. RabbitMQ core team members will evaluate the PRs currently open for streadway/amqp as time allows, and pull those that don't have any conflicts. We cannot promise that every PR would be accepted but at least we are open to changing the API going forward.

Note that it is a high season for holidays in some parts of the world, so we may be slower to respond in the next few weeks but otherwise, we are eager to review as many currently open PRs as practically possible soon.

Thank you for using RabbitMQ and contributing to this client. On behalf of the RabbitMQ core team, @chunyilyu and @michaelklishin.

rickyzhang82 commented 3 years ago

@RainJoe

I see your point now after investigating into the core dump.

Channel.Publish is in dead lock state if confirmation notification channel is not consumed.

  1. In Goroutine 98, the confirmation has arrived before Channel.Publish finished. It acquired the mutex in confirms 0xc000b9b940. It waits for consuming the confirmation notification channel.
  2. In Goroutine 2869, it wants to acquire the mutex in confirms 0xc000b9b940.

The work around is to consume the confirmation in a separate Go routine and synchronize Publish with another channel.

What a mess!

(dlv) gr 2869 bt
 0  0x000000000043a4c5 in runtime.gopark
    at /usr/local/go/src/runtime/proc.go:307
 1  0x000000000044af85 in runtime.goparkunlock
    at /usr/local/go/src/runtime/proc.go:312
 2  0x000000000044af85 in runtime.semacquire1
    at /usr/local/go/src/runtime/sema.go:144
 3  0x000000000046c267 in sync.runtime_SemacquireMutex
    at /usr/local/go/src/runtime/sema.go:71
 4  0x0000000000487b45 in sync.(*Mutex).lockSlow
    at /usr/local/go/src/sync/mutex.go:138
 5  0x0000000000a82d9a in sync.(*Mutex).Lock
    at /usr/local/go/src/sync/mutex.go:81
 6  0x0000000000a82d9a in github.com/streadway/amqp.(*confirms).Publish
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/confirms.go:32
 7  0x0000000000a81edf in github.com/streadway/amqp.(*Channel).Publish
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/channel.go:1360
  ....

(dlv) gr 2869 frame 6 args
c = ("*github.com/streadway/amqp.confirms")(0xc000b9b940)
~r0 = (unreadable empty OP stack)

(dlv) gr 98 bt
 0  0x000000000043a4c5 in runtime.gopark
    at /usr/local/go/src/runtime/proc.go:307
 1  0x0000000000405aea in runtime.chansend
    at /usr/local/go/src/runtime/chan.go:258
 2  0x0000000000405895 in runtime.chansend1
    at /usr/local/go/src/runtime/chan.go:143
 3  0x0000000000a82e57 in github.com/streadway/amqp.(*confirms).confirm
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/confirms.go:45
 4  0x0000000000a82fc5 in github.com/streadway/amqp.(*confirms).One
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/confirms.go:66
 5  0x0000000000a7ea65 in github.com/streadway/amqp.(*Channel).dispatch
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/channel.go:314
 6  0x0000000000a7ec05 in github.com/streadway/amqp.(*Channel).recvMethod
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/channel.go:351
 7  0x0000000000a84f55 in github.com/streadway/amqp.(*Connection).dispatchN
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/connection.go:477
 8  0x0000000000a84a67 in github.com/streadway/amqp.(*Connection).demux
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/connection.go:436
 9  0x0000000000a851f6 in github.com/streadway/amqp.(*Connection).reader
    at /go/pkg/mod/github.com/streadway/amqp@v1.0.0/connection.go:528
10  0x0000000000470001 in runtime.goexit
    at /usr/local/go/src/runtime/asm_amd64.s:1374

(dlv) gr 98 frame 4 args
c = ("*github.com/streadway/amqp.confirms")(0xc000b9b940)
confirmed = github.com/streadway/amqp.Confirmation {DeliveryTag: 1, Ack: true