apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
655 stars 336 forks source link

Access to partitionConsumer state not thread-safe. #448

Open flowchartsman opened 3 years ago

flowchartsman commented 3 years ago

As part of my testing of the fix for #432, I have discovered a race condition in consumer_partition.go where access to the struct's state member is modified concurrently without synchronization:

WARNING: DATA RACE
Read at 0x00c00039a918 by goroutine 9:
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).clearQueueAndGetNextMessage()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:911 +0x68
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).clearReceiverQueue()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:932 +0x84
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).grabConn()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:861 +0x1312
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).reconnectToBroker()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:802 +0x238
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop.func2()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:711 +0xab

Previous write at 0x00c00039a918 by goroutine 70:
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).internalClose()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:772 +0x68f
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:732 +0x29b

Goroutine 9 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).runEventsLoop()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:704 +0x146

Goroutine 70 (finished) created at:
  github.com/apache/pulsar-client-go/pulsar.newPartitionConsumer()
      /Users/me/pulsar-client-go/pulsar/consumer_partition.go:184 +0xead
  github.com/apache/pulsar-client-go/pulsar.(*consumer).internalTopicSubscribeToPartitions.func1()
      /Users/me/pulsar-client-go/pulsar/consumer_impl.go:314 +0x7d3
flowchartsman commented 3 years ago

Preliminary testing with go.uber.org/atomic indicates that this indeed eliminates the race in the referenced test.

diff --git a/go.mod b/go.mod
index bf0b627..2bb071f 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
    github.com/spf13/pflag v1.0.3 // indirect
    github.com/stretchr/testify v1.4.0
    github.com/yahoo/athenz v1.8.55
+   go.uber.org/atomic v1.7.0
 )

 replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index eaf7163..637ce54 100644
--- a/go.sum
+++ b/go.sum
@@ -154,6 +154,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
 github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 285cf29..54d2c5a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -24,6 +24,7 @@ import (
    "time"

    "github.com/gogo/protobuf/proto"
+   "go.uber.org/atomic"

    "github.com/apache/pulsar-client-go/pulsar/internal"
    "github.com/apache/pulsar-client-go/pulsar/internal/compression"
@@ -31,9 +32,7 @@ import (
    "github.com/apache/pulsar-client-go/pulsar/log"
 )

-var (
-   lastestMessageID = LatestMessageID()
-)
+var lastestMessageID = LatestMessageID()

 type consumerState int

@@ -86,7 +85,7 @@ type partitionConsumer struct {

    // this is needed for sending ConsumerMessage on the messageCh
    parentConsumer Consumer
-   state          consumerState
+   state          atomic.Int32
    options        *partitionConsumerOpts

    conn internal.Connection
@@ -127,7 +126,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
    messageCh chan ConsumerMessage, dlq *dlqRouter,
    metrics *internal.TopicMetrics) (*partitionConsumer, error) {
    pc := &partitionConsumer{
-       state:                consumerInit,
        parentConsumer:       parent,
        client:               client,
        options:              options,
@@ -148,6 +146,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
        dlq:                  dlq,
        metrics:              metrics,
    }
+   pc.setState(consumerInit)
    pc.log = client.log.SubLogger(log.Fields{
        "name":         pc.name,
        "topic":        options.topic,
@@ -159,14 +158,16 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
    err := pc.grabConn()
    if err != nil {
        pc.log.WithError(err).Error("Failed to create consumer")
+       pc.nackTracker.Close()
        return nil, err
    }
    pc.log.Info("Created consumer")
-   pc.state = consumerReady
+   pc.setState(consumerReady)

    if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
        msgID, err := pc.requestGetLastMessageID()
        if err != nil {
+           pc.nackTracker.Close()
            return nil, err
        }
        if msgID.entryID != noMessageEntry {
@@ -174,6 +175,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon

            err = pc.requestSeek(msgID.messageID)
            if err != nil {
+               pc.nackTracker.Close()
                return nil, err
            }
        }
@@ -198,12 +200,13 @@ func (pc *partitionConsumer) Unsubscribe() error {
 func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
    defer close(unsub.doneCh)

-   if pc.state == consumerClosed || pc.state == consumerClosing {
+   pstate := pc.getState()
+   if pstate == consumerClosed || pstate == consumerClosing {
        pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
        return
    }

-   pc.state = consumerClosing
+   pc.setState(consumerClosing)
    requestID := pc.client.rpcClient.NewRequestID()
    cmdUnsubscribe := &pb.CommandUnsubscribe{
        RequestId:  proto.Uint64(requestID),
@@ -214,7 +217,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
        pc.log.WithError(err).Error("Failed to unsubscribe consumer")
        unsub.err = err
        // Set the state to ready for closing the consumer
-       pc.state = consumerReady
+       pc.setState(consumerReady)
        // Should'nt remove the consumer handler
        return
    }
@@ -224,7 +227,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
        pc.nackTracker.Close()
    }
    pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
-   pc.state = consumerClosed
+   pc.setState(consumerClosed)
 }

 func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
@@ -305,7 +308,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
 }

 func (pc *partitionConsumer) Close() {
-   if pc.state != consumerReady {
+   if pc.getState() != consumerReady {
        return
    }

@@ -334,7 +337,8 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
 }

 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
-   if pc.state == consumerClosing || pc.state == consumerClosed {
+   pstate := pc.getState()
+   if pstate == consumerClosing || pstate == consumerClosed {
        pc.log.Error("Consumer was already closed")
        return nil
    }
@@ -376,7 +380,8 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {
 func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
    defer close(seek.doneCh)

-   if pc.state == consumerClosing || pc.state == consumerClosed {
+   pstate := pc.getState()
+   if pstate == consumerClosing || pstate == consumerClosed {
        pc.log.Error("Consumer was already closed")
        return
    }
@@ -738,11 +743,16 @@ func (pc *partitionConsumer) runEventsLoop() {

 func (pc *partitionConsumer) internalClose(req *closeRequest) {
    defer close(req.doneCh)
-   if pc.state != consumerReady {
+   pstate := pc.getState()
+   if pstate != consumerReady {
+       // this might be redundant but to ensure nack tracker is closed
+       if pc.nackTracker != nil {
+           pc.nackTracker.Close()
+       }
        return
    }

-   if pc.state == consumerClosed || pc.state == consumerClosing {
+   if pstate == consumerClosed || pstate == consumerClosing {
        pc.log.Error("The consumer is closing or has been closed")
        if pc.nackTracker != nil {
            pc.nackTracker.Close()
@@ -750,7 +760,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
        return
    }

-   pc.state = consumerClosing
+   pc.setState(consumerClosing)
    pc.log.Infof("Closing consumer=%d", pc.consumerID)

    requestID := pc.client.rpcClient.NewRequestID()
@@ -769,7 +779,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
        provider.Close()
    }

-   pc.state = consumerClosed
+   pc.setState(consumerClosed)
    pc.conn.DeleteConsumeHandler(pc.consumerID)
    if pc.nackTracker != nil {
        pc.nackTracker.Close()
@@ -790,7 +800,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
    }

    for maxRetry != 0 {
-       if pc.state != consumerReady {
+       if pc.getState() != consumerReady {
            // Consumer is already closing
            return
        }
@@ -876,7 +886,6 @@ func (pc *partitionConsumer) grabConn() error {

    res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
        pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
-
    if err != nil {
        pc.log.WithError(err).Error("Failed to create consumer")
        return err
@@ -908,7 +917,7 @@ func (pc *partitionConsumer) grabConn() error {
 }

 func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
-   if pc.state != consumerReady {
+   if pc.getState() != consumerReady {
        return trackingMessageID{}
    }
    wg := &sync.WaitGroup{}
@@ -1028,6 +1037,14 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
        })
 }

+func (pc *partitionConsumer) getState() consumerState {
+   return consumerState(pc.state.Load())
+}
+
+func (pc *partitionConsumer) setState(state consumerState) {
+   pc.state.Store(int32(state))
+}
+
 func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
    if msgID.Undefined() {
        return nil

Note: diff is merely illustrative of the approach of using this particular package for the fix. Other changes, like the nacktracker change have entered the main branch since the changes I made for #432, and are not germane to the issue.

I haven't determined if this problem is specific to the regex consumer or if this is a deeper issue with the module, though it seems more likely that it's the former since all tests seem to be run with the race detector on, and I'd think (hope? :D) that it would have been seen by now otherwise.

If this is the only member accessed like this, it may be an acceptable fix to wrap access with something like go.uber.org/atomic as I've done here, since it provides a convenient interface and the change is relatively straightforward, however while writing the tests that caused this I did notice that there's a lack of coordination between the regex consumer and its constituent sub-consumers. For the most part, it simply spins them up and only removes them if their topics go away. There's no way for the regexp consumer to know that one of its consumers has died, much less that it died for a non-retryable reason (such as its topic going away in this case). A potential fix for this might involve an event channel between the consumers and the regexp consumer and probably some fixes to error introspection like I mentioned here, since currently the only way I can see to do it is to inspect the error string itself for the condition, and that smells.

zzzming commented 3 years ago

It looks like this is a miss. In state was synchronized in producer_partition.go and connection.go with sync/atomic

https://github.com/uber-go/atomic/blob/master/uint32.go uses sync/atomic underneath. Can we converge to use uber library for simplicity? @merlimat @flowchartsman

flowchartsman commented 3 years ago

Did #451 address this? If so, this should be closed as fixed, no?