wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
785 stars 128 forks source link

graceful shutdown is invalid #174

Closed nilnoun closed 2 months ago

nilnoun commented 3 months ago

The expectation was to exit after completing the task, but it exited immediately upon receiving the exit signal.


example:

comsumer.go

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/wagslane/go-rabbitmq"
)

func main() {
    conn, err := rabbitmq.NewConn(
        "amqp://guest:guest@localhost",
        rabbitmq.WithConnectionOptionsLogging,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    consumer, err := rabbitmq.NewConsumer(
        conn,
        "my_queue",
        rabbitmq.WithConsumerOptionsQueueDurable,
        rabbitmq.WithConsumerOptionsConcurrency(1),
        rabbitmq.WithConsumerOptionsQOSPrefetch(1),
    )
    if err != nil {
        log.Fatal(err)
    }

    sigs := make(chan os.Signal, 1)

    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        fmt.Println("awaiting signal")
        sig := <-sigs

        fmt.Println()
        fmt.Println(sig)
        fmt.Println("stopping consumer")

        consumer.Close()
    }()

    // block main thread - wait for shutdown signal
    err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
        for i := 0; i < 10; i++ {
            log.Printf("consumed: %v, %d", string(d.Body), i)
            time.Sleep(1 * time.Second)
        }
        // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
        return rabbitmq.Ack
    })
    if err != nil {
        log.Fatal(err)
    }
}

output

awaiting signal
2024/07/01 11:15:54 gorabbit INFO: Processing messages on 1 goroutines
2024/07/01 11:15:54 consumed: 1f7e305c-8361-462d-8bfc-60f3ee6c36b4, 0
2024/07/01 11:15:55 consumed: 1f7e305c-8361-462d-8bfc-60f3ee6c36b4, 1
^C
interrupt
stopping consumer
2024/07/01 11:15:55 gorabbit INFO: waiting for handler to finish...
2024/07/01 11:15:55 gorabbit INFO: closing channel manager...
2024/07/01 11:15:55 gorabbit INFO: closing consumer...
2024/07/01 11:15:55 gorabbit INFO: closing connection manager...
2024/07/01 11:15:55 gorabbit INFO: amqp channel closed gracefully
thibleroy commented 3 months ago

Hello, I have tried to fix this: https://github.com/wagslane/go-rabbitmq/pull/175

nilnoun commented 3 months ago

Thank you.

wagslane commented 2 months ago

fix is merged, please open a new issue if 0.14.2 doesn't fix