nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

Question about concurrency and Max in Flight #221

Closed joaodlf closed 6 years ago

joaodlf commented 6 years ago

Hi, this isn't so much of an issue but more of a support question.

I come from a Kafka background where we managed rate limiting via channels and goroutines, it's nice to see these concepts already being considered by default on the go-nsq library - I'm just having some issues understanding how all of this actually works.

If I set the following:

cfg.MaxInFlight =200

I expect this to mean "I can consume up to 200 messages at a time" - Is this correct? If so, I'm not understanding normal Handler operation:

c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        log.Println(string(message.Body))
        time.Sleep(time.Second * 2)
        return nil
}))

With the Sleep in place, I expected to be able to consume up to 200 messages in a very short period of time, but what I'm actually seeing is:

2017/12/19 19:04:04 hello world 2017/12/19 19:04:06 hello world 2017/12/19 19:04:08 hello world 2017/12/19 19:04:10 hello world 2017/12/19 19:04:12 hello world 2017/12/19 19:04:14 hello world 2017/12/19 19:04:16 hello world

Each message is processed in 2 second intervals, so what is MaxInFlight actually doing here?

I considered using AddConcurrentHandlers, but this just added to my confusion: Setting the concurrency parameter to 1 has the same behaviour as AddHandler, upping the concurrency limit will of course mean there are more goroutines waiting to consume, which in turn means the messages are consumed at the desired speed rate... But I still don't understand what MaxInFlight is then being used for.

Should I be using use goroutines inside the HandlerFunc? When should I be using AddConcurrentHandlers?

Thanks for your time and patience.

ploxiln commented 6 years ago

As you noticed, you'll have to call something like AddConcurrentHandlers(handler, 200) to get your desired effect. But you'll still need to set MaxInFlight as well.

MaxInFlight is sort of comparable to the TCP "window size", it controls how many messages nsqd will send send to the consumer before hearing back about any of them. It often makes sense for it to be higher than the number of concurrent handler coroutines: those coroutines will be idle during the round-trip-time between the consumer and the nsqd, and if they're CPU-bound then having way more than you have cores just slows down message processing. There's also the issue of "spreading" the MaxInFlight across all the nsqd that the consumer connects to.

Note that the other first-class nsq client library, pynsq, only has max_in_flight and no separate concurrency number ... it's a more pure event-loop thing, and works a bit differently.

joaodlf commented 6 years ago

Thanks! That makes a bit more sense to me now.

shaocongliang commented 5 years ago

This option seems to only make sense when the number of concurrency is high. One situation I encountered was when I set maxInFlight to 8, It took 5 seconds to process 10,000 requests(200 goroutines, 500 requests per goroutine). On the other hand, It tooks 2 seconds to process 10 requests(1 goroutines).

In short, slow down message processing which in low concurrency; optimize message processing which in high concurrency

background