streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Channel blocks despite connection is closed #518

Open melardev opened 2 years ago

melardev commented 2 years ago

Hi, I have an issue with the library 1.0.0. It seems to be a deadlock, the issue is not always reproducible, locally I start my application, pause it to timeout the AMQP connection, then resume, if the app re-establishes the connection and the channels then I pause it again and repeat until the app enters in a "deadlock" kind of situation.

I noticed my app was having too many errors, my wrappers around the *amqp.Channelwere complaining the channel is closed, my wrapper around *amqp.Connection did complain connection was closed, then logged it retried to re-connect, it was successful, and now when began calling the Channel wrappers to re-establish their channels, my connection wrapper went silent ... this was an indicator of getting stuck, there must be a deadlock somewhere, inspecting the stack traces I see indeed a deadlock. When the connection was closed I reinitiated it and then called the listeners so they can re-establish their channels:

[...]
amqpErr := <-c.errorChan
if amqpErr != nil {
  _ = c.connection.Close()
  // establish connection again
  [...]
  c.connection = newConnection
  // call listeners
  for _, l := range c.connectionListeners {
    l.OnNewConnection(newConnection)
  }
  [...]
}
[...]

One of my *amq.Channels that I have implemented was stuck in a code like this:

func (r*ChannelWrapper) OnNewConnection(conn *amqp.Connection) {
[...]
if r.channel != nil {
    _ = r.channel.Close()  // <---------------------- Stuck here
}

r.channel, err = conn.Channel()
[...]

Looking at the stack trace, It was stuck in the streadway library at

func (c *Connection) releaseChannel(id uint16) {
  c.m.Lock() // <---------- here
  defer c.m.Unlock()

  delete(c.channels, id)
  c.allocator.release(int(id))
}

That meant, someone is already locking the mutex, so I began searching which one is locking it and found:

func (ch *Channel) shutdown(e *Error) {
  c.destructor.Do(func() {
    c.m.Lock() 
    defer c.m.Unlock()

    if err != nil {
      for _, c := range c.closes {
        c <- err // <------------------------------- Stuck here, with the mutex locked
      }
    }

I made sure the connection object was the same, so I am not possibly talking about two different *amqp.Connection objects, for my run it was 0xc000270c80

Really sorry for not providing a source code sample, the library was big and hard to create a simple demo, just wanted to ask, is this a bug? or I have done something wrong?

if there is nothing wrong in the library then why it does not exit if the connection is closed? Selection_291

Situation 2

Now I introduced a change, I made the OnNewConnection Asynchronous, so my connection wrapper won't get blocked because a listener gets stuck:

for _, l := range c.connectionListeners {
   // call the listener in another goroutine
   go l.OnNewConnection(newConnection)
  }

Now it is even more interesting, the new connection is re-established and able to re-setup its error listener and move one, without waiting for the listeners to finish setting up their channels, my connection wrapper works fine and is now waiting for a connection error, so it is stuck(as intended) here:

[...]
amqpErr := <-c.errorChan   // <----------- Here
if amqpErr != nil {

The old connection was closed, however, the listener is still stuck on the same spot (so all the above is applicable too), which means, stuck closing a channel that was "inside" a connection that was previously closed.

I made sure everything is as described so I checked the pointers, The channel that got stuck belongs to an *amqp.Connection object 0xc0004ade00, the new connection that was re-established and is now working as expected, waiting for a new error to occur is now 0xc000944280, and it is the old connection object(that is now closed) the one in both shutdown() which locks the mutex and in releaseChannel waiting the lock to be released.

melardev commented 2 years ago

Hi, can somebody confirm if the issue exists? it is a huge deal if the client keeps stuck while the connection is closed ...

melardev commented 2 years ago

I improved greatly my code and the issue seems to have disappeared, I don't have proof but after seeing the logs I can confidently tell the framework very easily enters a deadlock state if two goroutines are managing the connection/channel, I know channels are supposed to be single-threaded, but I had a case where the channel re-establishement could be called by two threads, it was protected with a semaphore but iI was still triggering from time to time causing either one:

  1. operation channel.close caused a connection exception channel_error: "expected 'channel.open'"
  2. operation channel.open caused a connection exception channel_error: "second 'channel.open' seen"

My code may have not been the best, but IMO the framework can't just hang there forever, hanging forever also occurs when a Queue or Exchange are being declared with different definitions than the existent Queue or Exchange has ... this was not occurring to me in the past, I was just getting an error, now I am, it gets stuck, the only way to find out is to read the rabbitmq logs, which does not seem to me great either, a framework meant to be used by people should never deadlock because an illegal operation is performed, instead an error should be returned.

lukebakken commented 2 years ago

Really sorry for not providing a source code sample, the library was big and hard to create a simple demo

Thanks for the information, but the library maintainers don't have the time to try and guess what could cause this. Providing code that can be compiled and run is a minimum requirement and is appreciated.

You should also try your code with the go client supported by the RabbitMQ engineering team - https://github.com/rabbitmq/amqp091-go

If you can reproduce your problem, open an issue there and provide runnable code.

melardev commented 2 years ago

@lukebakken Hi, I understand, do you want me to create another issue? Thanks for pointing out to me the other library, happy to see a RabbitMQ Go client more maintained than this one, I will definitely try to move to it asap, but for now I want to see what I can do to "fix my bugs" with streadway, if you can help it would be great.

For now I provided the code in this Gist file: https://gist.github.com/melardev/829c82e4a7c415c55a299f6d7b53d69d

Sorry for the not that shortcode. The code looks horrible and very wrong, the idea is to show when the framework hangs, I assume the framework should never hang even if the user is misusing it.

This snippet I saw it hanging on channel.Close() and channel.QueueDeclare.

The way I use the files is as a project, I launch the main with Goland IDE, let it run without Breakpoints, after 1 minute I set a breakpoint on https://gist.github.com/melardev/829c82e4a7c415c55a299f6d7b53d69d#file-base_channel-go-L92 if it gets hit, that means it is not completely stuck, remove breakpoint and resume, if the breakpoint is never hit, it means it got stuck, pause the execution and explore the stack traces of goroutines and see where each is stuck.

Update#1: https://github.com/rabbitmq/amqp091-go behaves the same way, it also hangs.

jqiris commented 2 years ago

遇到相同的问题,一旦连接失效异常,就会有消息堵塞,把整个程序堵死