nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

producer: avoid sending duplicate data #217

Open ghost opened 7 years ago

ghost commented 7 years ago
for {
    select {
    case t := <-w.transactionChan:
        w.transactions = append(w.transactions, t)
        err := w.conn.WriteCommand(t.cmd)
        if err != nil {
            w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
            w.close()
        }
    case data := <-w.responseChan:
        w.popTransaction(FrameTypeResponse, data)
    case data := <-w.errorChan:
        w.popTransaction(FrameTypeError, data)
    case <-w.closeChan:
        goto exit
    case <-w.exitChan:
        goto exit
    }
}

after producer called Stop, there may be data in channal w.responseChan, and exitChan may has been closed, and select randomly selected goto exit.

then in

func (w *Producer) transactionCleanup() {
    // clean up transactions we can easily account for
    for _, t := range w.transactions {
        t.Error = ErrNotConnected
        t.finish()
    }
    w.transactions = w.transactions[:0]
    ... ...
}

if we don't read responseChan, the successfully sent data may be send again after creating a new producer and publish again.

mreiferson commented 7 years ago

@zwb-ict great catch! I think this might explain the long-standing bug in #171! ⭐️

mreiferson commented 7 years ago

@zwb-ict I've looked a bit more into this. I don't think the race condition you've identified here is possible. The underlying connection only calls to the OnClose delegate (which calls close(w.closeChan)) after fully cleaning up the connection (see https://github.com/nsqio/go-nsq/blob/master/conn.go#L694). Given that w.responseChan and w.errorChan are both unbuffered channels, it implies that there cannot be a message sitting on those channels once w.closeChan is closed, which would only happen after the connection was done reading from the connection.

ghost commented 7 years ago

@mreiferson Is there a possible when user called Stop?

func (w *Producer) Stop() {
    w.guard.Lock()
    if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
        w.guard.Unlock()
        return
    }
    w.log(LogLevelInfo, "stopping")
    close(w.exitChan)
    w.close()
    w.guard.Unlock()
    w.wg.Wait()
}

close(w.exitChan) is called before w.close() (which will call w.conn.close())

So router may exit for because of case <-w.exitChan:

func (w *Producer) router() {
    for {
        select {
        case t := <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err := w.conn.WriteCommand(t.cmd)
            if err != nil {
                w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
                w.close()
            }
        case data := <-w.responseChan:
            w.popTransaction(FrameTypeResponse, data)
        case data := <-w.errorChan:
            w.popTransaction(FrameTypeError, data)
        case <-w.closeChan:
            goto exit
        case <-w.exitChan:
            goto exit
        }
    }

exit:
    w.transactionCleanup()
    w.wg.Done()
    w.log(LogLevelInfo, "exiting router")
}

Before producer's goroutine run w.transactionCleanup(). conn's goroutine may pass data into w.responseChan or w.errorChan before w.conn.close().

ghost commented 7 years ago

@mreiferson below https://github.com/nsqio/go-nsq/blob/master/conn.go#L536, should we add goto exit like https://github.com/nsqio/go-nsq/blob/master/conn.go#L508?

mreiferson commented 7 years ago

Is there a possible when user called Stop?

@zwb-ict yes, it looks like in that case the race would be possible.

below https://github.com/nsqio/go-nsq/blob/master/conn.go#L536, should we add goto exit like https://github.com/nsqio/go-nsq/blob/master/conn.go#L508?

Hmmm, possibly, but let's not do that in this PR?

ghost commented 7 years ago

@mreiferson yes, let's not do that in this PR.