Open estahn opened 5 years ago
The rabbitmq-cli-Consumer does not deal with concurrency. It processes one message at a time. If you need concurrency, you can spin up as mutch instances as you need.
@corvus-ch Would you be willing to accept a PR that implements handling of concurrency? We currently have a PHP version of what rabbitmq-cli-consumer
does, which basically runs sequentially as well. We upscale our Kubernetes pods based on the queue length, but that seems all a bit wasteful.
Dealing with concurrency is a complexity I tried to keep out of the project. The way it is designed is to delegate it to a suitable process supervisor. With Kubernetes in mind (and similar platforms for that matter), I can see why this might be helpful from an orchestrationanl point of view.
If I would accept such a PR, the following requirements must be met:
With those two things in mind, feel free to go ahead and provide PR.
@corvus-ch Working through the code I'm wondering why the func (p *processor) Process
is given the message as type delivery.Delivery
instead of just the plain message.
Shouldn't the responsibility of ack/nack'ing be lying on the consumer? I would imagine the processor returns success/failure and the consumer will ack/nack based on that.
@corvus-ch I have something that is working to some extent, but I'm not sure where that exception is coming from.
2019/02/27 14:41:50 Connecting RabbitMQ...
2019/02/27 14:41:50 Connected.
2019/02/27 14:41:50 Opening channel...
2019/02/27 14:41:50 Done.
2019/02/27 14:41:50 Setting QoS...
2019/02/27 14:41:50 Succeeded setting QoS.
2019/02/27 14:41:50 Declaring queue "foobar"...
2019/02/27 14:41:50 Registering consumer...
2019/02/27 14:41:50 Succeeded registering consumer.
2019/02/27 14:41:50 Processing messages with 3 workers.
2019/02/27 14:41:50 Waiting for messages...
2019/02/27 14:41:50 [Worker 2] Processing message...
2019/02/27 14:41:50 [Worker 0] Processing message...
2019/02/27 14:41:50 [Worker 1] Processing message...
hellohello2019/02/27 14:42:00 [Worker 0] Processed!
2019/02/27 14:42:00 [Worker 0] Processing message...
hello2019/02/27 14:42:00 [Worker 2] Processed!
2019/02/27 14:42:00 [Worker 2] Processing message...
2019/02/27 14:42:00 [Worker 1] Processed!
2019/02/27 14:42:00 Exception (406) Reason: "PRECONDITION_FAILED - unknown delivery tag 1"
@estahn Can you do a pull request with your changes so I can have a look?
Searching for that error on the web, indicates that your code might be mixing up messages and acknowledgments and or that sharing channels across threads/(go functions?) is an issue. Without looking at the code, it is not possible for me to tell what the issue is.
@corvus-ch Will do shortly. I think the issue is double acking of the message (https://github.com/streadway/amqp/issues/83).
Just some comments:
unknown delivery tag 1
. Possibly double ack-ing the message.In order to find what the issue with the unknown delivery tag is, I have hacked together a minimal example.
package main
import (
"flag"
"log"
"github.com/streadway/amqp"
)
var (
uri = flag.String("uri", "amqp://guest:guest@localhost", "AMQP URI")
queue = flag.String("queue", "myqueue", "Ephemeral AMQP queue name")
ctag = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
)
func init() {
flag.Parse()
}
func main() {
conn, err := amqp.Dial(*uri)
if err != nil {
log.Fatalf("dial: %v", err)
return
}
channel, err := conn.Channel()
if err != nil {
log.Fatalf("channel: %v", err)
return
}
queue, err := channel.QueueDeclare(*queue, true, false, false, false, nil)
if err != nil {
log.Fatalf("queue Declare: %v", err)
return
}
if err := channel.Qos(5, 0, true); err != nil {
log.Fatalf("qos: %v", err)
return
}
deliveries, err := channel.Consume(queue.Name, *ctag, false, false, false, false, nil)
if err != nil {
log.Fatalf("queue Consume: %v", err)
return
}
done := make(chan error)
for i := 0; i < 10; i++ {
go handle(i, deliveries, done)
}
if err := <- done; err != nil {
log.Fatalf("error during message consumption: %v", err)
}
}
func handle(i int, deliveries <-chan amqp.Delivery, done chan error) {
for d := range deliveries {
log.Printf("[%d] got %dB delivery: [%v] %q", i, len(d.Body), d.DeliveryTag, d.Body)
if err := d.Ack(false); err != nil {
done <- err
}
}
done <- nil
}
The issue lies with the message acknowledgement. The boolean passed to Ack()
is set to true
the current and all prior unacknowledged messages will be acknowledged. With concurrency in place, this can lead to a situation where a worker tries to acknowledge a message which was already acknowledged by another worker. This occurs if worker A is processing message 1 and worker B is processing message 2 and worker B finishes before A. Once worker A tries to acknowledge its message, the error occurs.
To fix this, the boolean in https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L28 and https://github.com/corvus-ch/rabbitmq-cli-consumer/blob/master/delivery/delivery.go#L33 needs to be changed to false.
@corvus-ch Yay! Nice job tracking this down!
Docu: https://github.com/streadway/amqp/blob/master/delivery.go#L113-L115
I'll add the adjustments.
Hi all - I also needed an option for message processing concurrency, and wanted to implement it using multiple RabbitMQ channels to be as close as possible to how our project handles RabbitMQ in other languages (we have consumers in node and python, as well as PHP). This seems to be the canonical way to handle multiple concurrent consumers in a single client in RabbitMQ.
@corvus-ch What is the intent of the mutex in processor.Process? After further testing, my multi-channel solution is fetching messages in parallel, but only processing one at a time because each call has to wait for the mutex to free up. The simple solution is to remove the mutex, but that could obviously cause issues that I'm not seeing. The more complicated solution would be to have a separate processor per channel, but I don't want to pursue that solution if there's a simpler one.
@corvus-ch I think I answered my own question - Processor had a single os.Cmd that it was re-using for each new call. I just pushed an update to my PR that uses a new os.Cmd for each call to Process instead.
What's the limit on concurrency? If we have 10k messages in the queue, will this spin-up 10k PHP processes at the same time?