wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
805 stars 128 forks source link

consumer.StopConsuming didn't close consumer right away #58

Closed mrsdpz closed 2 years ago

mrsdpz commented 2 years ago

In the examples `package main

import ( "fmt" "log" "os" "os/signal" "syscall" "time"

amqp "github.com/rabbitmq/amqp091-go"
rabbitmq "github.com/wagslane/go-rabbitmq"

)

var consumerName = "example"

func main() { consumer, err := rabbitmq.NewConsumer( "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithConsumerOptionsLogging, ) if err != nil { log.Fatal(err) } err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { log.Printf("consumed: %v", string(d.Body)) // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue for i:=1;i<10;i++{ fmt.Println(i) time.Sleep(1* time.Second) } return rabbitmq.Ack }, "my_queue", []string{"routing_key", "routing_key_2"}, rabbitmq.WithConsumeOptionsConcurrency(1), rabbitmq.WithConsumeOptionsQueueDurable, rabbitmq.WithConsumeOptionsQuorum, rabbitmq.WithConsumeOptionsBindingExchangeName("events"), rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), rabbitmq.WithConsumeOptionsBindingExchangeDurable, rabbitmq.WithConsumeOptionsConsumerName(consumerName), ) if err != nil { log.Fatal(err) }

// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

go func() {
    sig := <-sigs
    fmt.Println()
    fmt.Println(sig)
    done <- true
}()

fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")

// wait for server to acknowledge the cancel
noWait := false
consumer.StopConsuming(consumerName, noWait)
time.Sleep(9* time.Second)
consumer.Disconnect()

} ` I want to execute the task after stop consume

I stopped the program after a few seconds, `2022/01/05 19:47:08 gorabbit: Processing messages on 1 goroutines 2022/01/05 19:47:08 consumed: hello, world 1 awaiting signal 2 3 4 ^C interrupt stopping consumer 5 6 7 8 9 2022/01/05 19:47:17 consumed: hello, world 1 2 3 4

Process finished with the exit code 0

` Its start a new consume after ctrl c

wagslane commented 2 years ago

Please format this better!

wagslane commented 2 years ago

I'm unable to reproduce, I think this has been fixed. If not, please reopen with better formatting!