apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
652 stars 335 forks source link

Possible data race in internalFlush #1258

Closed Gilthoniel closed 2 months ago

Gilthoniel commented 2 months ago

Expected behavior

When calling Flush or FlushWithCtx, enqueued messages should all be sent.

Actual behavior

When flushing, switching to a new channel can lead to a message loss:

    if len(p.dataChan) != 0 {
        oldDataChan := p.dataChan
        p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages)
        for len(oldDataChan) != 0 {
            pendingData := <-oldDataChan
            p.internalSend(pendingData)
        }
    }

If internalSendAsync is sending the request to the channel at the same time as the switch, it may happen that the length will be zero while flushing, and then become one so the message will be stuck in the channel.

Steps to reproduce

I never actually observed this in practice but I noticed this bug while reading the code for a different issue. I'm confident this can happen but I'd like your opinion.

System configuration

Pulsar version: v3.0.5

Gilthoniel commented 2 months ago

A different approach would be something like that:

// flushDataChan first empties the data channel as much as possible, then send
// the different pending requests.
func (p *partitionProducer) flushDataChan() {
    var reqs []*sendRequest

    for {
        select {
        case pendingData := <-p.dataChan:
            reqs = append(reqs, pendingData)
        default:
            for _, req := range reqs {
                p.internalSend(req)
            }
                        return
        }
    }
}

It would also avoid the channel allocation for every flush in high load scenarios.

gunli commented 2 months ago

Hmm, internalSendAsync() is called by Send() or SendAsync(), IMO, you should not call Flush() and Send()/SendAsync() at the same time, I think it should be a convention.