knative-extensions / eventing-rabbitmq

RabbitMQ eventing components. Knative Source and Broker.
Apache License 2.0
91 stars 67 forks source link

Dispatcher deadlock occurs when the rabbitmq server is down while the dispatcher is waiting for the response #1369

Closed ckyoog closed 5 months ago

ckyoog commented 7 months ago

It is easy to spot this bug if the following 2 criteria are fulfilled.

  1. The rabbitmq server is not stable. It goes down for many times a day.
  2. The event handling on the subscriber/deadLetterSink side is slow. (The duration from the dispatcher sending out the event to the subscriber/deadLetterSink sending back the response is long)

(Luckily, or unluckily, my testing environment meets both of them.)

Describe the bug In the beginning, I noticed an issue of the dispatcher. Every time when the rabbitmq server goes down, and stays down for a short time, then comes back up , the dispatcher can't restart the consuming. It can reconnect to the rabbitmq server, but just can't restart the consuming. If check on the rabbitmq server side (like the management UI), will see there is no consumer for the queue.

This issue kept happening in my test env, which drove me to investigate it.

With many debugging logs added, I narrowed down the suspicious code, which is in the file pkg/dispatcher/dispatcher.go.

    connNotifyChannel, chNotifyChannel := conn.NotifyClose(make(chan *amqp.Error)), channel.NotifyClose(make(chan *amqp.Error))
    for {
        select {
        case <-ctx.Done():
            logging.FromContext(ctx).Info("context done, stopping message consumers")
            finishConsuming(wg, workerQueue)
            return ctx.Err()
        case <-connNotifyChannel:
            finishConsuming(wg, workerQueue)
            return amqp.ErrClosed
        case <-chNotifyChannel:
            finishConsuming(wg, workerQueue)
            return amqp.ErrClosed
        case msg, ok := <-msgs:
            if !ok {
                finishConsuming(wg, workerQueue)
                return amqp.ErrClosed
            }
            workerQueue <- msg
        }
    }
  1. After the rabbitmq server shutdown, the notification channel registered with the function Connection.NotifyClose() or Channel.NotifyClose() will become readable;
  2. And that would make the finishConsuming() get called, and in turn, wg.Wait() would be called to wait for the dispatch goroutine to finish;
  3. And the problem is right here. The dispatch goroutine would be blocking. The function dispatch() which is called in the goroutine would get stuck without returning.
  4. With further debugging, the function dispatch() is found to be stuck at the call to the msg.Ack()
  5. At that moment, the current connection to the rabbitmq server is already lost, (sooner or later, another goroutine watchRabbitMQConnections() would create a new connection), according to the doc of amqp091, the call to msg.Ack() is supposed to return an error to indicate the connection is lost, rather than blocking there.

(What makes the msg.Ack() block?)

Expected behavior Every time when the rabbitmq server goes down, and stays down for no matter how long, then comes back up , the dispatcher can restart the consuming.

To Reproduce

  1. First, create a temp slow event handler (subscriber/deadLetterSink), like below
    @api.post("/")
    def event_handler():
    print("-=start=-")
    sleep(120)
    return 200, ""
  2. Trigger the event. The event goes through the rabbitmq queue and the dispatcher, finally gets to the slow event handler.
  3. Immediately shutdown the rabbitmq server once we see the "-=start=-" printed out.
  4. Keep the rabbitmq server down for a couple of seconds. Make sure the dispatcher has detected the sever is down -- the NotifyClose channel becomes readable, the function finishConsuming() is called.
  5. Get the rabbitmq server back up, we will see no more error logs of the dispatcher about the attempts to reconnect to the rabbitmq server, which indicates that the dispatcher has established a new connection to the server. But on the rabbitmq server side, we will see no consumer for the queue. If we re-trigger the event, it won't be consumed, won't be sent to the event handler.

    Knative release version From 1.10.0 to 1.13.0. (I didn't test the older version. I think they might be the same.)

    Additional context In the end, I managed to figure out the root cause, which is a deadlock that blocks the msg.Ack(). The deadlock occurs this way.

In the dispatcher, in the dispatch goroutine

  1. The dispatcher stops at the call to ceClient.Request()

The RabbitMQ server, the remote peer, shuts down

In the amqp091 library

  1. The connection-lost event would be detected by the amqp091 package first. The Connection.shutdown() will be called. (The caller is Connection.send())
  2. Connection.shutdown() will send the error to the Connection.NotifyClose channel to notify the consumer (which is the dispatcher in this case) that the connection to the remote server is closed

In the dispatcher, in the main goroutine, the piece of code shown above is triggered.

  1. The dispatcher receives the notification from the Connection.NotifyClose channel
  2. It will call finishConsuming()
  3. which will, in turn, call wg.Wait() to wait for all the dispatch goroutine to call wg.Done(). (But right now, one of the dispatch goroutine stops at the call to ceClient.Request())

In the amqp091 library,

  1. The Connection.shutdown() will continue to call the Channel.shutdown()
  2. Channel.shutdown() will first call the ch.m.Lock(), then will do the similar thing -- send the error to the Channel.NotifyClose channel to notify the consumer (which is the dispatcher in this case) that the RabbitMQ Channel is closed.

The first deadlock occurs here.

Sending the error to the Channel.NotifyClose channel would block . Because there is no receiver on the Channel.NotifyClose channel.

Why is there no receiver on the Channel.NotifyClose? Also see the piece of code above. As we can see, the receiver of the Channel.NotifyClose channel is this one, case <-chNotifyChannel:. It will never be reached, because another receiver, case <- connNotifyChannel, has been executed, and as the result, the process is either blocked in the finishConsuming() or returned from the current function.

(BTW., This deadlock blocks one of the sub goroutine that is calling the "shutdown" function. Because it doesn't block the main goroutine, so it doesn't look like that harmful.)

The final consequence for this part is

  1. The Channel.shutdown() couldn't return, because the deadlock
  2. The mutex ch.m.Lock() couldn't be unlocked.

Back to the dispatcher, in the dispatch goroutine

  1. The response of ceClient.Request() is finally received, the msg.Ack() would be called
  2. msg.Ack() would eventually call ch.m.Lock() to lock the same mutex, which is being locked right now

So obviously, this is the second deadlock.

To sum up, the whole deadlock thing is like this

  1. The amqp091 library sends the error to the unbuffered channel registered by Connection.NotifyClose() and Channel.NotifyClose()
  2. Once the dispatcher receives the error from the Connection.NotifyClose channel, it won't read the Channel.NotifyClose channel any longer, that blocks the sender, Channel.shutdown()
  3. The Channel.shutdown() being blocked makes the mutex ch.m.Lock() stay locked
  4. The mutex ch.m.Lock() staying locked blocks the msg.Ack()
  5. The msg.Ack() blocks the dispatch goroutine, so the wg.Done() will never be called.
  6. That blocks the function finishConsuming() at the call to wg.Wait().
  7. The function finishConsuming(), in turn, blocks its caller, the ConsumeFromQueue(), that eventually results in the dispatcher being unable to restart the consuming

By referring to the doc of amqp091, I believe the deadlock is out of a kind of improper use of the NotifyClose channel. Best practises for Connection and Channel notifications

So I have come up with my own fix. A PR will be created soon.

ikavgo commented 7 months ago

cool story, Appreciate your dedication. Dead locks are certainly the most favorite of all bugs for me :-) I left a small request in the PR. Thanks!

ikavgo commented 7 months ago

FYI - https://github.com/rabbitmq/amqp091-go/pull/256 Thank you again for the investigation and PR. I would love to learn more on your usecase, are you on CNCF slack?

ckyoog commented 7 months ago

Yes I am.

ckyoog commented 5 months ago

Since the PR is merge, the issue is fixed for me, I will close this issue.