streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

Weird consumer behavior #89

Closed falzm closed 10 years ago

falzm commented 10 years ago

Hi

I'm sorry to open an issue for what seems to be a misunderstanding on my side, but I'd like to be sure. I'm facing a weird behavior using your AMQP client package: https://gist.github.com/falzm/8144509

Here is how I use this code:

i=0 ; while true; do
  amqp-publish -u amqp://rabbitmq.local:5672/%2f -e amq.direct -r plop -C text/plain -b "blah $i"
  sleep 1s
  i=$(($i + 1))
done
marc@cobalt:~% ./amqp_consumer 
2013/12/27 10:17:34 consumer: connected to broker
2013/12/27 10:17:34 consumer: declared exchange amq.direct (direct)
2013/12/27 10:17:34 consumer: bound queue "plop" matching key "plop" to exchange "amq.direct"
2013/12/27 10:17:34 consumer: start consuming from queue "plop"
2013/12/27 10:17:36 received message: blah 0
2013/12/27 10:17:37 received message: blah 1
2013/12/27 10:17:38 received message: blah 2
2013/12/27 10:17:39 received message: blah 3
2013/12/27 10:17:40 received message: blah 4

...and so on.

This code works as intended if lines 130 to 133 (https://gist.github.com/falzm/8144509#file-amqp_consumer-go-L130-L133) are uncommented (enabled), but doesn't if commented (disabled) as currently shown in the gist: the connection is established, it looks like the consumer is correctly bound to its queue but no messages are displayed. The messages are correctly processed by the consumer (when looking at the RabbitMQ dashboard the messages are dispatched to the consumer from the queue) It looks like no messages are sent through the delivery channel, hence nothing is displayed after the message "consumer: start consuming from queue "plop"".

Do you have any idea of what I could have done wrong? Let me know if you need more info/details.

Regards,

m.

streadway commented 10 years ago

I recommend that you build this example with "go build -race" and work through any errors. My guess is that since you're reading from and writing to .messages from different goroutines without any ordering the behavior of this program is undefined.

One way you can work with multiple consumers, and get reconnect for free is to isolate your concerns into individual goroutines and use chans to pass ownership between the goroutines. Here is an (untested) example of a dispatcher that translates 1:N channels to consumers.

Something like:

func work(messages chan amqp.Delivery) {                                                                                                                    
  for msg := range messages {                                                                                                                               
    fmt.Println(string(msg.Body))                                                                                                                           
  }                                                                                                                                                         
}                                                                                                                                                           

func main() {                                                                                                                                               
  channels := make(chan *amqp.Channel)                                                                                                                      

  // dispatcher (could be func too... I like my goroutines to have a name)
  go func() {                                                                                                                                               
    for channel := range channels {                                                                                                                         
      for i := 0; i < runtime.NumCPU(); i++ {                                                                                                               
        messages := channel.Consume(...)                                                                                                                    
        go work(messages)                                                                                                                                   
      }                                                                                                                                                     
    }                                                                                                                                                       
  }()                                                                                                                                                       

  for {                                                                                                                                                     
    // dial the connection                                                                                                                                  
    // create a channel                                                                                                                                     
    // declare exchanges                                                                                                                                    
    // declare queues                                                                                                                                       
    // bind queues                                                                                                                                          

    // channel is now ready to be consumed                                                                                                                  
    channels <- channel                                                                                                                                     
  }                                                                                                                                                         
} 

If you need to signal a shutdown then send the new consumer to the shutdown signaler with a different channel from the dispatcher goroutine after it has completed its task of starting the workers.

You ought not check for running, because there may be messages in flight that need to be fully consumed from the chan amqp.Delivery returned from amqp.Channel.Consume. These consumer chans will be closed by the package after all messages have been delivered, so it's intended to 'range' over them until they are completed.

If you do not fully consume from all chan amqp.Delivery chans, the package will block as this is the means to provide pushback to the server for proper flow control under consumer load.

Hope this helps.

falzm commented 10 years ago

Sean

You're right, there is indeed a race condition:

marc@cobalt:~% ./amqp_consumer 
2013/12/27 13:49:12 consumer: connected to broker
2013/12/27 13:49:12 consumer: declared exchange amq.direct (direct)
2013/12/27 13:49:12 consumer: bound queue "plop" matching key "plop" to exchange "amq.direct"
2013/12/27 13:49:12 consumer: starting consumer
==================
WARNING: DATA RACE
Write by goroutine 10:
  main.amqpStartConsumer()
      /home/marc/amqp_consumer.go:199 +0x14a
  main.func·002()
      /home/marc/amqp_consumer.go:155 +0x3d1

Previous read by goroutine 11:
  main.func·004()
      /home/marc/amqp_consumer.go:169 +0x65

Goroutine 10 (running) created at:
  main.main()
      /home/marc/amqp_consumer.go:165 +0xd6d

Goroutine 11 (running) created at:
  main.main()
      /home/marc/amqp_consumer.go:178 +0xd7b
==================
2013/12/27 13:49:12 consumer: start consuming from queue "plop"
^C
2013/12/27 13:49:12 received termination signal
2013/12/27 13:49:12 consumer: stopped consuming from queue "plop"
2013/12/27 13:49:12 consumer: disconnecting from broker
2013/12/27 13:49:12 consumer: exiting
Found 1 data race(s)

Does this mean that I try to read from the delivery channel before/at the same moment it is created? I overviewed the page you linked, could using a lock solve the race condition?

However, regarding your suggestion of using multiple consumers I don't need that at the moment. What I'm trying to do is to be able to pause consuming when a certain number of "worker" goroutines are running, hence the "control" goroutine (https://gist.github.com/falzm/8144509#file-amqp_consumer-go-L135-L165). I'm not sure that my method is correct, do you have any insights about this?

Thank you for your help,

m.

streadway commented 10 years ago

For that race, the simple fix is to only use chans to share state or state ownership between goroutines. The longer I code with Go, the more I see the use of sync.Mutex as revealing a design flaw by sharing state rather than communicating over chans.

Also I just noticed that the consumerTag should be unique for all consumers on a Channel. This means you should be generating your own consumerTag if you're starting and canceling consumers.

I'm not certain you ought to use the global counter of running goroutines to manage your degree of parallelism. Other packages may be creating goroutines that will count against your algorithm. runtime.Numgoroutines is good for profiling, and leak checking, but isn't very useful for max concurrency checking. It's perfectly fine to have the number of goroutines in the hundreds of thousands for some applications.

What you likely want is to use a buffered channel and its ability to communicate to act as a semaphore to keep the number of consumers at your desired value, local to the pool of work you wish to limit.

To do this I use something similar to:

const maxConcurrency = 10
running := make(chan bool, maxConcurrency)
for something := range somethings {
  running <- true
  go func() {
    // work with something
    <-running
  }()
}

Or you can use a similar model as a "leaky bucket" listed in effective go where your control goroutine creates "leases" which you take before starting work and restore when complete.

What exactly would you like to accomplish?

falzm commented 10 years ago

You're right, using channels as a mean to synchronize gorountines is more idiomatic, thus probably the best way to go (pun intented). I'll look into this.

Basically, what I'm trying to accomplish is to de-queue jobs (shells commands to be executed, output to be sent back to its sender on a "results" queue) from a RabbitMQ queue, but pause when reaching a certain number of running processes (one per goroutine). My initial take at this was to toggle consuming using Channel.Consume()/Channel.Cancel() when hitting this limit up or down. After reading your hints, I wonder if it's possible to achieve the same goal without Cancel()-ing the AMQP channel, but rather stop reading from the Channel.delivery Go channel when reaching the maximum allowed concurrency limit: if I pause looping over the Channel.delivery channel — because being blocked on a filled Go buffered channel, as you suggested —, will the AMQP consumer still de-queue messages from the RabbitMQ queue? Because I'd like it not to, if possible.

Besides, you suggested using several consumers in my process: is there any benefit to do this? I won't be able to spawn more message-processing goroutines.

Thanks,

m.

streadway commented 10 years ago

It sounds like Channel.Qos(maxConcurrency, 0) is what you're looking for. When you consume with autoAck = false, the server will only send you prefetchCount number of messages until it receives an acknowledgment for the messages it has delivered.

So it would be a similar consumer loop but with an Ack when you have completed your work (either success or fail).

This is how I'd implement what you described:

func runJob(payload []byte) (yourResultType, error) {
    ... // unit testable
}

func publishResult(channel *amqp.Channel, res yourResultType, err error) {
    ... // unit testable if you use an interface type for Channel.Publish(...)
}

func work(channel *amqp.Channel, msg amqp.Delivery) {
    res, err := runJob(msg.Body)
    publishResult(channel, res, err)
    msg.Ack()
}

func main() {
    const maxConcurrency = 10

    for {
        // dial the connection
        // create a channel
        channel.Qos(maxConcurrency, 0) // only receive N messages per channel until ack'd.
        // declare exchanges
        // declare queues
        // bind queues
        // start a single consumer but work in parallel
        for msg := range messages {
            go work(channel, msg)
        }
    }
}
falzm commented 10 years ago

Sounds like a plan! I'll give your idea a try, thanks again for your time :)

m.

streadway commented 10 years ago

:) you're welcome.