wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
768 stars 125 forks source link

AutoReconnect only reconnect the consumer to 1 queue instead of all the queues #87

Closed FR19 closed 1 year ago

FR19 commented 2 years ago

Sorry I'm new to golang, I just created 1 consumer to consume multiple queues. But when the connection dropped, and the reconnect mechanism started, it successfully reconnect but only to 1 queue. Below is my code for POC

// main.go
package main

import (
    "log"
    "strings"
    "sync"

    . "poc-reconnect/worker/east"
    . "poc-reconnect/worker/west"

    rabbitmq "github.com/wagslane/go-rabbitmq"
)

var workerList = make(map[string]interface {
    Execute(wg *sync.WaitGroup, concurrent int)
})

func main() {
    amqpAddress := "amqp://guest:guest@localhost:5672/poc"

    // initialize consumer
    consumer, err := rabbitmq.NewConsumer(
        amqpAddress, rabbitmq.Config{},
    )
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Register the workers
    workerList["west"] = West{Consumer: consumer}
    workerList["east"] = East{Consumer: consumer}

    // worker concurrent configuration
    workerConcurrent := map[string]int{
        "west": 2,
        "east": 2,
    }

    wg := sync.WaitGroup{}

    // run all worker
    for worker, thread := range workerConcurrent {
        wg.Add(1)
        go workerList[strings.ToLower(worker)].Execute(&wg, thread)
    }
    wg.Wait()
}
// worker/east/east.go
package east

import (
    "log"
    "sync"

    rabbitmq "github.com/wagslane/go-rabbitmq"
)

type East struct {
    Consumer rabbitmq.Consumer
}

const work_queue = "east_queue"

func (e East) Execute(wg *sync.WaitGroup, concurrent int) {
    defer wg.Done()

    forever := make(chan bool)
    // Subscribing to the queue
    err := e.Consumer.StartConsuming(
        func(d rabbitmq.Delivery) rabbitmq.Action {
            log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
            // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
            return rabbitmq.Ack
        },
        work_queue,
        []string{work_queue},
        rabbitmq.WithConsumeOptionsConcurrency(concurrent),
        rabbitmq.WithConsumeOptionsQueueDurable,
        rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
        rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
        rabbitmq.WithConsumeOptionsBindingExchangeDurable,
    )

    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)

    <-forever
}
// worker/west/west.go
package west

import (
    "log"
    "sync"

    rabbitmq "github.com/wagslane/go-rabbitmq"
)

type West struct {
    Consumer rabbitmq.Consumer
}

const work_queue = "west_queue"

func (w West) Execute(wg *sync.WaitGroup, concurrent int) {
    defer wg.Done()

    forever := make(chan bool)
    // Subscribing to the queue
    err := w.Consumer.StartConsuming(
        func(d rabbitmq.Delivery) rabbitmq.Action {
            log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
            // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
            return rabbitmq.Ack
        },
        work_queue,
        []string{work_queue},
        rabbitmq.WithConsumeOptionsConcurrency(concurrent),
        rabbitmq.WithConsumeOptionsQueueDurable,
        rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
        rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
        rabbitmq.WithConsumeOptionsBindingExchangeDurable,
    )

    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)

    <-forever
}

On the first run, the consumer correctly initialized

image image

But after I force close the connection through the RabbitMQ UI to trigger the reconnect mechanism, it only reconnect to 1 queue

image image

Is this bug or did I do something wrong in my code?

Thanks

wagslane commented 1 year ago

At the moment its designed such that you should use one consumer per queue