nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

consumer: redistribute RDY when connections are active #208

Closed mreiferson closed 7 years ago

mreiferson commented 7 years ago

fixes #179

mreiferson commented 7 years ago

RFR @jehiah @judwhite @ploxiln

This is one simple way to address #179 by bounding the time a consumer will stay pinned to any producing nsqd, regardless of last message delivery.

ploxiln commented 7 years ago

Looks good to me.

I noticed this part below seems less than optimal (abbreviated):

    for len(possibleConns) > 0 && availableMaxInFlight > 0 {
        availableMaxInFlight--
        i := r.rng.Int() % len(possibleConns)
        c := possibleConns[i]
        possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
        r.updateRDY(c, 1)
}

All conns are added to possibleConns, so it's quite possible that some of them already have non-zero ready count. So some of availableMaxInFlight will end up not actually distributed. (But this problem will probably fix itself in later redistributions. And it's not new.)

This could be fixed by appending to possibleConns only if the conn has a zero ready count.

mreiferson commented 7 years ago

All conns are added to possibleConns, so it's quite possible that some of them already have non-zero ready count. So some of availableMaxInFlight will end up not actually distributed.

Hmm, yea, can probably be improved, but it's at least randomized.

mreiferson commented 7 years ago

@ploxiln want to gimme a 👍 — also, there are probably a few lessons to learn from nsqio/pynsq#179 and nsqio/pynsq#176

ploxiln commented 7 years ago

👍 to these additions.

... but the existing rdyCount logic is a bit tricky, and I'm not really familiar with it, and it's taking a while to digest. Anyway, I'm confident that this is strictly an improvement. fwiw :)