Closed sebastienfr closed 8 years ago
I managed to reproduce the issue, it's about the routing_key. I have 1 producer using two cony clients to send data in the same queue and exchange but with a different routing key. On the other side I have 2 separate subscribers receiving the messages each with a different routing_key but same queue and exchange.
When only one client is connected he gets all the messages from routing_key 1 and 2. When both are connected, sometimes it works, each one receives its routed messages, but sometimes one receives the messages of the other one.
When I use the rabbit management interface, the queue is fine with all the messages with two different routing_key. I have to check my subscriber code but at first sight it looks fine.
Stay tuned...
Here's the consumer source code running in a dedicated go routine and pushing messages to a channel.
`logger.WithField("config", config).Debug("preparing new consummer") // construct new client with the flag url // and default backoff policy cli := cony.NewClient( cony.URL(config.URL), cony.Backoff(DefaultBackoff), )
// declarations
// the queue name will be supplied by the AMQP server
que := &cony.Queue{
Name: config.Queue,
Durable: config.Durable,
AutoDelete: config.AutoDelete,
Exclusive: config.Exclusive,
}
exc := cony.Exchange{
Name: config.Exchange,
Kind: config.ExchangeType,
AutoDelete: config.AutoDelete,
Durable: config.Durable,
}
bnd := cony.Binding{
Queue: que,
Exchange: exc,
Key: config.RoutingKey,
}
cli.Declare([]cony.Declaration{
cony.DeclareQueue(que),
cony.DeclareExchange(exc),
cony.DeclareBinding(bnd),
})
// prepare consumerOpt from config
options := make([]cony.ConsumerOpt, 0)
if config.AutoAck {
options = append(options, cony.AutoAck())
}
if config.Exclusive {
options = append(options, cony.Exclusive())
}
if config.NoLocal {
options = append(options, cony.NoLocal())
}
// declare and register a consumer
cns := cony.NewConsumer(
que,
options...,
)
cli.Consume(cns)
logger.Debug("starting client loop to receive messages")
for cli.Loop() {
select {
case msg := <-cns.Deliveries():
logger.WithField("delivery", msg).Debug("received message")
config.Messages <- msg.Body
// if needed auto ack messages
if !config.AutoAck {
err := msg.Ack(false)
if err != nil {
logger.WithField("msg", msg).WithField("error", err).Debug("fail to send ack on message")
}
}
case err := <-cns.Errors():
logger.WithField("error", err).Warn("consummer error")
case err := <-cli.Errors():
logger.WithField("error", err).Warn("client error")
}
}
logger.Debug("exiting subscriber connection loop")`
@sebastienfr It seems that there is some misconception about how RabbitMQ works.
Consumers don't care about routing keys, routing keys only used during publishing, when new message arrives to exchange, exchange decide in which queue this message will go (based on routing key). Consuming single queue with 2 consumers means round-robin pattern of message distribution between consumers.
Hello,
Thank you for your reply, looks like I misunderstood the rmq tutorial, because in the binding the consumer side is using the routing key, else why would it be needed ? I'll check again the docs https://www.rabbitmq.com/tutorials/tutorial-four-go.html.
Thanks for your answer
Hi,
I was working on existing code, as you suggested it was effectively wrong about the single queue and the key, it's fine now, I'm using 2 queues as you suggested it. It's quite amazing this was working 99% of the time. Thanks for your help.
Hi,
I was just wondering if any of you experienced issue receiving messages twice or missing some messages some times. I have each rmq message received and processed in a separate go routine and I added message numbering. I can see from the logs in rabbit and in my software that between reception and the handling of the message some strange behavior occur. I'll will dig further, but in any case, feel free to share your experience.