nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

consumer: sendRDY error not propogating #199

Open armcknight opened 7 years ago

armcknight commented 7 years ago

Hello,

first of all apologies if my terminology is a bit off, I'm not a regular Go programmer :)

We run a process reading from NSQ servers over an SSH tunnel. While debugging an issue when this connection breaks, we found a potential problem with how an error from sendRDY will not fully propagate.

sendRDY possibly emits an error (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L950-L964):

func (r *Consumer) sendRDY(c *Conn, count int64) error

updateRDY, which calls sendRDY, also possibly emits an error (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L907):

func (r *Consumer) updateRDY(c *Conn, count int64) error

But that error isn't handled in it's own recursive call here (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L940):

r.rdyRetryTimers[c.String()] = time.AfterFunc(5*time.Second,
    func() {
        r.updateRDY(c, count)
    })

We were thinking that the failure for the error to fully propagate means our process doesn't pick up the loss of connection and doesn't know to attempt a mitigation.

We also found a few other invocations of updateRDY that don't appear to handle errors, which both appear in startStopContinueBackoff, which doesn't report that it can throw an error (https://github.com/nsqio/go-nsq/blob/d71fb89c9e0263aaed89645b0b168e11ccf597b1/consumer.go#L761):

bmhatfield commented 7 years ago

For a little bit of extra context, this seems to require a pretty specific set of circumstances for us. When the tunnel drops, sometimes it's detected and a reconnect happens, other times we see this:

2016/11/01 10:19:07 ERR    1 [csym/create] (127.0.0.1:3001) IO error - write tcp 127.0.0.1:51178->127.0.0.1:3001: write: broken pipe
2016/11/01 10:19:07 ERR    1 [csym/create] (127.0.0.1:3001) error sending RDY 1 - write tcp 127.0.0.1:51178->127.0.0.1:3001: write: broken pipe

When we fall into this mode, we do not observe a reconnect (even though the tunnel would have eventually come back up on it's own, we'd need to reinitialize the connection to NSQ)

mreiferson commented 7 years ago

@armcknight @bmhatfield thanks for the detailed info, I'll try to take a look at this!

mreiferson commented 7 years ago

I just poked around at this.

Despite not handling the returned errors in sendRDY / updateRDY, conn.WriteCommand calls c.delegate.OnIOError which calls conn.Close, which should then trigger reconnect cycle.

The only reason why it wouldn't is if messages are in flight and never "complete", meaning the rest of the cleanup logic doesn't execute. This is probably a poor assumption though, perhaps we should bound this with some timeout.

Thoughts?

djmally commented 7 years ago

We resolved this recently on our end, your explanation is pretty spot-on for what we were experiencing. We had messages still in flight when the RDY count was getting redistributed, which caused the connection with the in-flight messages to close prematurely. We fixed this by upping low_rdy_idle_timeout to 2 minutes in our NSQ client configuration.

I don't think I quite understand all of the inner workings of this package to comment on whether or not there should be a timeout on this operation in the client, so I'll leave that up to you, but hopefully the way we resolved this internally may provide some assistance in making that decision.