Closed MCBrandenburg closed 6 years ago
I've added dead letter exchanges to my queue and when deploying the app to k8s the channel is closing but the connection is still alive
rmqConn, err := amqp.Dial(rmqDSN) if err != nil { panic(err) } defer rmqConn.Close() rmqChan, err := rmqConn.Channel() if err != nil { panic(err) } defer rmqChan.Close() args := make(amqp.Table) args["x-dead-letter-exchange"] = "errors" args["x-dead-letter-routing-key"] = "listening" listen, err := rmqChan.QueueDeclare("listening", true, false, false, false, args) if err != nil { panic(err) } args["x-dead-letter-routing-key"] = "publish" publish, err := rmqChan.QueueDeclare("publish", true, false, false, false, args) if err != nil { panic(err) } errChan := make(chan error) msgChan := make(chan amqp.Delivery) msg, err := rmqChan.Consume(listen.Name, "", true, false, false, false, nil) if err != nil { panic(err) } wg.Add(1) go func() { defer wg.Done() for d := range msg { msgChan <- d } }() defer wg.Wait() for { select { case err := <-errChan: log.Error(err.Error()) case del := <-msgChan: //work err = rmqChan.Publish("", publish.Name, false, false, amqp.Publishing{Body: b}) if err != nil { del.Reject(false) break } del.Ack(false) } } defer wg.Wait()
I process a random number of messages and I get the following error and then the application hangs
Exception (504) Reason: "channel/connection is not open"
Figured out the issue, I had autoAck set to true
autoAck
true
I've added dead letter exchanges to my queue and when deploying the app to k8s the channel is closing but the connection is still alive
I process a random number of messages and I get the following error and then the application hangs
Exception (504) Reason: "channel/connection is not open"