wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

Add heavier locking to the FinalizePartition() function #84

Closed sparrc closed 7 years ago

sparrc commented 8 years ago

When running ConsumerGroup.CommitUpTo(msg) in a goroutine, I get a race condition between FinalizePartition and CommitUpTo

Below is the race condition that I get when running with the race detector:

2015/11/18 13:03:59 Started the kafka consumer service, peers: [192.168.99.100:2181], topics: [telegraf_test_topic_1447877039]
==================
WARNING: DATA RACE
Read by goroutine 36:
  github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).FinalizePartition()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:130 +0x162
  github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).partitionConsumer()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:416 +0x1688

Previous write by goroutine 41:
  github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:226 +0xb1
  github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x153
  github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:235 +0xde
  github.com/influxdb/telegraf/plugins/kafka_consumer.(*Kafka).parser()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/plugins/kafka_consumer/kafka_consumer.go:132 +0x78c

Goroutine 36 (running) created at:
  github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicConsumer()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:319 +0xec7

Goroutine 41 (finished) created at:
  github.com/influxdb/telegraf/plugins/kafka_consumer.(*Kafka).Start()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/plugins/kafka_consumer/kafka_consumer.go:99 +0x33c
  github.com/influxdb/telegraf/plugins/kafka_consumer.TestReadsMetricsFromKafka()
      /Users/csparr/ws/go/src/github.com/influxdb/telegraf/plugins/kafka_consumer/kafka_consumer_integration_test.go:43 +0x9af
  testing.tRunner()
      /usr/local/Cellar/go/1.5.1/libexec/src/testing/testing.go:456 +0xdc
==================
2015/11/18 13:03:59 Could not parse kafka message: cpu_load_short,host=server01 1422568543702900257, error: unable to parse 'cpu_load_short,host=server01 1422568543702900257': invalid field format
2015/11/18 13:03:59 Kafka Consumer buffer is full, dropping a point. You may want to increase the point_buffer setting
PASS
Found 1 data race(s)
sparrc commented 8 years ago

I don't have a very concise example I can show to reproduce this, but you can see from the output that Goroutine 36 is being spun up within the consumergroup and is not something I have control over. My code spins up Goroutine 41 and calls CommitUpTo which eventually calls pOT.markAsProcessed().

The markAsProcessed call has a conflict with a read that is performed by the FinalizePartition() call. Adding a heavier lock to this function really shouldn't have an impact on performance because this function only gets called once per partition.

nicolopignarocket commented 8 years ago

:+1: on this, I'm reproducing the same issue

sparrc commented 8 years ago

@wvanbergen What do you think of this PR? Any thoughts?

wvanbergen commented 8 years ago

I have to spend some time on a good review. I remember making the locking more granular like this, to prevent potential deadlocks. This is a while ago though, so that may no longer be true or relevant.

sparrc commented 7 years ago

closing this because it doesn't appear to be getting merged