streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

amqp.Channel's method InspectQueue blocks when connection is lost #501

Open PeterEFinch opened 3 years ago

PeterEFinch commented 3 years ago

A colleague and I noticed that amqp.Channel's method InspectQueue is sometimes blocking when the connection is lost. I have included a minimal example of the code used below.

I tested this code under two different circumstances:

  1. AMQP_URL had the address of a local RabbitMQ server running in docker (rabbitmq:3-management). Once the connection was established I stopped the docker container. The program stopped as expected (see expected_output.txt).
  2. AMQP_URL had the address of our production RabbitMQ server. Once the connection was established I disconnected the internet. The program never stopped and appeared to be stuck at the method channel.QueueInspect (see blocking_output.txt).

Additional information: I ran this code inside GoLand with go version go1.16.2 darwin/amd64 . I removed the preamble but left in the exit message .

Hopefully, this problem should be reproducible.

package main

import (
    "fmt"
    "os"
    "time"

    "github.com/streadway/amqp"
)

func main() {
    const queue = "bug_testing_queue"

    url, ok := os.LookupEnv("AMQP_URL")
    if !ok {
        panic("no environment variable AMQP_URL")
    }

    connection, err := amqp.Dial(url)
    if err != nil {
        panic(err)
    }

    channel, err := connection.Channel()
    if err != nil {
        panic(err)
    }

    _, err = channel.QueueDeclare(queue, true, false, false, false, nil)
    if err != nil {
        panic(err)
    }

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        fmt.Println("waiting")
        select {
        case <-ticker.C:
            fmt.Println("pre queue inspect")
            _, err := channel.QueueInspect(queue)
            fmt.Println("post queue inspect")
            if err != nil {
                fmt.Printf("unable to inspect queue: %v\n", err)
                return
            }
        }
    }
}

Finally, adding

notifyCh := channel.NotifyClose(make(chan *amqp.Error))

before the for loop and

case <-notifyCh:
fmt.Println("channel closed unexpectedly")
return

to the select statement doesn't prevent the blocking output.

PeterEFinch commented 3 years ago

@michaelklishin I noticed you have said that Team RabbitMQ plans on forking this library (https://github.com/streadway/amqp/issues/497#issuecomment-791068860) and that @streadway lacks time (https://github.com/streadway/amqp/issues/497#issuecomment-791263157). Is there anyway we can move such issues to this planned library?