nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

consumer: redistributeRDY fails when connection remains active #179

Closed meAmidos closed 7 years ago

meAmidos commented 8 years ago

While experimenting with NSQ I've encountered a problem that looks like a flaw in the consumer code.

My setup is:

It occurs, that with this setup the consumer is able to read only from one nsqd of the two. The second nsqd never gets a chance to send anything because consumer constantly keeps the correspondent RDY = 0 for the second nsqd connection.

I found that for such cases there is a special treatment in the code - every once in a while the consumer tries to randomly redistribute RDY among connected nsqd nodes (method redistributeRDY). However, in practice it never works because of this condition in the code:

availableMaxInFlight := int64(maxInFlight) - atomic.LoadInt64(&r.totalRdyCount)
...
for len(possibleConns) > 0 && availableMaxInFlight > 0 {
<redistribution logic>
}

Debugging shows that availableMaxInFlight always equal 0 at this moment, and actual redistribution never happens.

In fact, even if I hack a little, and force this redistribution to happen it does not work either, because there is a similar condition in method updateRDY. When there is a lack of available RDY, this method sets up a timer to retry in 5 seconds, and this attempt to retry always fails, and the timer is rescheduled again and again.

So, the questions are:

mreiferson commented 8 years ago

@meAmidos thanks for the detailed report!

I just refreshed my memory of this code and I think you might have found a bug. Yes, go-nsq strictly enforces max-in-flight, so we do expect the consumer to perform "redistribution". However, the redistribution logic currently assumes that one of the connections will (eventually) be idle. In your case it never is, since there is a steady incoming stream of messages from each nsqd. This is the problematic code.

judwhite commented 8 years ago

This looks similar to an issue I attempted to address in the C# client, discussion here. The solution isn't optimal and requires a flag to be set because it will exceed max-in-flight as it's floating RDY's around looking for another nsqd with the current implementation.

mreiferson commented 7 years ago

see #208