IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.21k stars 1.73k forks source link

Error and data race using transaction example from the library #2913

Open bentcoder opened 1 month ago

bentcoder commented 1 month ago
Description

Hi,

Using pool of producers like shown in this example time to time cause kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing error and data race shown below.

Thanks

Versions
Sarama Kafka Go
v1.43.2 v3.3.2 1.22
Configuration
    cfg := sarama.NewConfig()
    cfg.ClientID = clientID
    cfg.Version = sarama.V3_3_2_0
    cfg.Net.MaxOpenRequests = 1
    cfg.Metadata.AllowAutoTopicCreation = false
    cfg.Producer.Return.Successes = true
    cfg.Producer.Compression = sarama.CompressionZSTD
    cfg.Producer.Idempotent = true
    cfg.Producer.RequiredAcks = sarama.WaitForAll
    cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
    cfg.Producer.Transaction.ID = tansactionID
    cfg.Producer.Transaction.Retry.Max = 10
    cfg.Producer.Transaction.Retry.Backoff = time.Millisecond * 100
Additional Context
==================
WARNING: DATA RACE
Write at 0x00c00010f310 by goroutine 251:
  github.com/IBM/sarama.(*transactionManager).transitionTo()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/transaction_manager.go:220 +0x330
  github.com/IBM/sarama.(*asyncProducer).maybeTransitionToErrorState()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1276 +0xec
  github.com/IBM/sarama.(*asyncProducer).returnError()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1286 +0xad
  github.com/IBM/sarama.(*asyncProducer).retryMessage()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1323 +0x109
  github.com/IBM/sarama.(*asyncProducer).retryMessages()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1332 +0x8c
  github.com/IBM/sarama.(*brokerProducer).handleError.func2()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1197 +0x8f4
  github.com/IBM/sarama.(*produceSet).eachPartition()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/produce_set.go:223 +0x89b
  github.com/IBM/sarama.(*brokerProducer).handleError()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1196 +0x3cd
  github.com/IBM/sarama.(*brokerProducer).handleResponse()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1060 +0x84
  github.com/IBM/sarama.(*brokerProducer).run()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:993 +0x1a93
  github.com/IBM/sarama.(*brokerProducer).run-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:791 +0x33

Previous read at 0x00c00010f310 by goroutine 252:
  github.com/IBM/sarama.(*transactionManager).publishTxnPartitions()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/transaction_manager.go:766 +0x40e
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:818 +0x23c
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap2()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:794 +0x33

Goroutine 251 (running) created at:
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:791 +0x4f3
  github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1343 +0x133
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:765 +0x244
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
      /Users/inan/go/pkg/mod/github.com/eapache/go-resiliency@v1.6.0/breaker/breaker.go:94 +0x83
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
      /Users/inan/go/pkg/mod/github.com/eapache/go-resiliency@v1.6.0/breaker/breaker.go:95 +0x3e
  github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
      /Users/inan/go/pkg/mod/github.com/eapache/go-resiliency@v1.6.0/breaker/breaker.go:58 +0x88
  github.com/IBM/sarama.(*partitionProducer).updateLeader()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:756 +0xca
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:619 +0xcb
  github.com/IBM/sarama.(*partitionProducer).dispatch()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:689 +0xd08
  github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:600 +0x33

Goroutine 252 (running) created at:
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:794 +0x690
  github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:1343 +0x133
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:765 +0x244
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
      /Users/inan/go/pkg/mod/github.com/eapache/go-resiliency@v1.6.0/breaker/breaker.go:94 +0x83
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
      /Users/inan/go/pkg/mod/github.com/eapache/go-resiliency@v1.6.0/breaker/breaker.go:95 +0x3e
  github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
      /Users/inan/go/pkg/mod/github.com/eapache/go-resiliency@v1.6.0/breaker/breaker.go:58 +0x88
  github.com/IBM/sarama.(*partitionProducer).updateLeader()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:756 +0xca
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:619 +0xcb
  github.com/IBM/sarama.(*partitionProducer).dispatch()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:689 +0xd08
  github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/sarama@v1.43.2/async_producer.go:600 +0x33
==================
puellanivis commented 1 month ago

Hm… yeah. The transitionTo() call is holding the statusLock while the publishTxnPartitions is holding the partitionTxnLock.

They should be holding a common lock, or they will race. :(

bentcoder commented 1 month ago

More conversation can be found here for reference.