nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

Frequently meet "Broken pipe error", if HandleMessage function is blocked #308

Closed umialpha closed 4 years ago

umialpha commented 4 years ago

Error Msg

IO error - write tcp [::1]:53277->[::1]:4150: i/o timeout

Scenario

Our server acts as the consumer of NSQ, it fetches messages from NSQ to in-memory buffer(If the buffer is full, it will block the handler), disables its auto-responses, then dispatches the messages to the backend workers. When the worker finishes the msg, it sends an ACK to our server, then our server send FIN command to the NSQ.

Why not connecting NSQ and workers directly?

Because we are supposed to support 10k workers. Connecting workers and NSQ directly is a big challenge. So we want to leverage some existed load balancers in front of our servers.

Config

Our test scenario. All the servers are on the localhost environment.

What's More

I don't know whether our scenario is a good practice of NSQ usage. My first thought is to connect workers and NSQ directly, it is simpler and more straightforward. But scalability is my major concern.

mreiferson commented 4 years ago

Hi @umialpha — can you provide the exact command lines / configuration for each process and an actual section of the logs for each process? Also, can you provide the source code for the worker?

umialpha commented 4 years ago

Hi @mreiferson , Thanks for your response. I find some clues from other similar issues.

Our server has an in-memory message buffer, when it's full, it will block HandleMessage function. When it is blocked, after about 30 seconds, the broken pipe error occurs. Here is my handler sample code.

type handler struct {
    topic   string
    channel string
    ch      chan *nsq.Message
}

func (h *handler) HandleMessage(m *nsq.Message) error {

    if len(m.Body) == 0 {
        // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
        return nil
    }
    m.DisableAutoResponse()
    h.ch <- m // ch is a buffered channel
    return nil
}

So I guess blocking HandleMessage will block the whole ioloop. However, the source code creates a go-routine when it AddConcurrentHandler, so it shouldn't block the ioloop. Please point out if I missed something.

mreiferson commented 4 years ago

Ahh, that makes sense. This isn't something we will fix in go-nsq.

Despite using "concurrent" handlers, and them running in separate goroutines, if each of them write to a single buffer (as represented by a go channel) they will eventually block and starve the IO loop if that channel isn't consumed fast enough. This is a good thing, as it provides back pressure.