elodina / go_kafka_client

Apache Kafka Client Library for Go
http://www.elodina.net
Apache License 2.0
275 stars 74 forks source link

Rebalance breaks when running multiple consumers #98

Open baconalot opened 9 years ago

baconalot commented 9 years ago

Usecase: Run N chronos/mesos jobs for a singe consumergroup where N == the number of partitions in the topic. (Lets assume a single topic consumergroup here)

Test: -Create a go consumer that eats from a large local kafka/topic with 2 partitions. -Start once (pid 1) -> looks ok, alternates between partition 0 and 1 -Start another (pid 2) -> looks ok, consumes only from one partition -But pid 1 is then crashed with following stack:

<<happily consuming here>>
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8002}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8003}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8004}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8005}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8006}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8007}
2015-04-23/17:22:25 [DEBUG] [zk] Getting info for broker 0
2015-04-23/17:22:25 [DEBUG] [zk] Trying to get partition assignments for topics [dev_allrequests_3]
2015-04-23/17:22:25 [DEBUG] [zk] Getting consumers in group asdasdsdads_cango
2015-04-23/17:22:25 [DEBUG] [zk] Getting consumers in group asdasdsdads_cango
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Releasing partition ownership
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Successfully released partition ownership
2015-04-23/17:22:25 [INFO] [zk] Commencing assertion series at /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [INFO] [zk] Joining state barrier /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [DEBUG] [zk] Trying to create path /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69/ebdffa95-ce32-e383-fddb-eec3d9a2e571 in Zookeeper
2015-04-23/17:22:25 [INFO] [zk] Successfully joined state barrier /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [DEBUG] [zk] Trying to assert rebalance state for group asdasdsdads_cango and hash e8ee221f572c5a810b7c60818eadeb69 with 2
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] [%!!(MISSING)s(int32=0) %!!(MISSING)s(int32=1)]
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)} attempting to claim {Topic: dev_allrequests_3, Partition: 1}
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Consumer is trying to reflect partition ownership decision: map[{dev_allrequests_3 1}:{ebdffa95-ce32-e383-fddb-eec3d9a2e571 0}]

2015-04-23/17:22:25 [DEBUG] [zk] Trying to create path /consumers/asdasdsdads_cango/owners/dev_allrequests_3 in Zookeeper
2015-04-23/17:22:25 [DEBUG] [zk] Successfully claimed partition 1 in topic dev_allrequests_3 for {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Consumer successfully claimed partition 1 for topic dev_allrequests_3
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Partition ownership has been successfully reflected
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Trying to reinitialize fetchers and workers
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Trying to update fetcher
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Updating fetcher with numStreams = 1
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Topic Registry = map[dev_allrequests_3:map[%!s(int32=1):{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher Manager started
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] TopicInfos = [{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Received asknext for {Topic: dev_allrequests_3, Partition: 1}
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Partition map: map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506]
2015-04-23/17:22:25 [DEBUG] [Sarama client] Adding block: topic=dev_allrequests_3, partition=1, offset=7506, fetchsize=1048576
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Sent partition data to {dev_allrequests_3 %!s(int32=1)}
2015-04-23/17:22:25 [DEBUG] [Sarama client] Processing fetch response
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Received asknext for {Topic: dev_allrequests_3, Partition: 0}
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Partition map: map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506]
2015-04-23/17:22:25 [DEBUG] [Sarama client] Adding block: topic=dev_allrequests_3, partition=0, offset=8008, fetchsize=1048576
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Sent partition data to {dev_allrequests_3 %!s(int32=0)}
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Updating fetcher configuration
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Got new list of partitions to process map[{dev_allrequests_3 1}:{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: 0, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] All partitions map: map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] There are obsolete partitions [{dev_allrequests_3 0}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0 parition map before obsolete partitions removal%!(EXTRA map[go_kafka_client.TopicAndPartition]int64=map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506])
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Remove partitions
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Stopping worker manager for {dev_allrequests_3 %!s(int32=0)}
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Trying to stop workerManager
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping manager
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping processor
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Successful manager stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping committer
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Successful committer stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopped failure counter
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Leaving manager stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopped workerManager
2015-04-23/17:22:25 [DEBUG] [zk] Trying to update path /consumers/asdasdsdads_cango/offsets/dev_allrequests_3/0
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Stopping buffer: {Topic: dev_allrequests_3, Partition: 0}-MessageBuffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Trying to stop message buffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Stopping message buffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Stopped message buffer
2015-04-23/17:22:25 [DEBUG] [Sarama client] Processing fetch response
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0 parition map after obsolete partitions removal%!(EXTRA map[go_kafka_client.TopicAndPartition]int64=map[{dev_allrequests_3 1}:7506])
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Shutting down idle fetchers
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Closed idle fetchers
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Adding fetcher for partitions map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] partitionsPerFetcher: map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Applied new partition map map[{dev_allrequests_3 1}:{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: 0, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Notifying all waiters about completed update
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Updated fetcher
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Fetcher has been updated &{ebdffa95-ce32-e383-fddb-eec3d9a2e571 asdasdsdads_cango map[dev_allrequests_3:[{ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}]] %!s(*go_kafka_client.StaticTopicsToNumStreams=&{ebdffa95-ce32-e383-fddb-eec3d9a2e571 map[dev_allrequests_3:1]}) map[dev_allrequests_3:[%!s(int32=0) %!s(int32=1)]] map[dev_allrequests_3:[{934b2c74-7e81-8d6b-52c4-fd034c9a273e %!s(int=0)} {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}]] [934b2c74-7e81-8d6b-52c4-fd034c9a273e ebdffa95-ce32-e383-fddb-eec3d9a2e571] [{Version: 1, Id: 0, Host: localhost, Port: 9092}] [all topics here]}
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Initializing worker managers from topic registry: map[dev_allrequests_3:map[%!s(int32=1):{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]]
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Restarted streams
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Fetchers and workers have been successfully reinitialized
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Rebalance has been successfully completed
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x4b4388]

goroutine 62144 [running]:
github.com/stealthly/go_kafka_client.func·015()
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:343 +0x618
github.com/stealthly/go_kafka_client.inReadLock(0xc208011048, 0xc2083bcf80)
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/utils.go:52 +0x54
github.com/stealthly/go_kafka_client.(*consumerFetcherRoutine).processPartitionData(0xc2086f9450, 0xc2086ec840, 0x11, 0x0, 0xc20c616000, 0x1d6, 0x200)
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:348 +0x141
created by github.com/stealthly/go_kafka_client.func·013
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:268 +0xb82

<<more here>>

Fix: I am not sure if this has some unwanted side effects but I was able to fix this in go_kafka_client.fetcher.go:

func (f *consumerFetcherRoutine) processPartitionData(topicAndPartition TopicAndPartition, messages []*Message) {
    Trace(f, "Trying to acquire lock for partition processing")
    inReadLock(&f.manager.updateLock, func() {
        for f.manager.updateInProgress {
            f.manager.updatedCond.Wait()
        }
        Tracef(f, "Processing partition data for %s", topicAndPartition)
        if len(messages) > 0 {
+           if f.allPartitionMap[topicAndPartition] == nil{
+               return
+           }
            f.partitionMap[topicAndPartition] = messages[len(messages)-1].Offset + 1
            f.allPartitionMap[topicAndPartition].Buffer.addBatch(messages)
            Debugf(f, "Sent partition data to %s", topicAndPartition)
        } else {
            Trace(f, "Got empty message. Ignoring...")
        }
    })
}
serejja commented 9 years ago

Hey @baconalot please checkout the latest master, we got rid of using the allPartitionMap (which is the topic-partition map used by fetcher manager) in fetcherRoutines and this problem should vanish now. Let us know though if doesn't work as expected for you.

Thanks!

baconalot commented 9 years ago

Hi @serejja that helps with the crash, which does not occur anymore. But.. now either one of the pids get all the partitions or nothing. Is is arbitrary if the latest started one gets the partitions or they remain with the first started, but they never balance like: pid1 -> part1 + part2 [enter pid2] pid1 -> part2 & pid2 -> part1.

serejja commented 9 years ago

Hi @baconalot,

sorry for getting this abandoned. Does this still occur? Lots of changes were made since then including fixing lots of rebalance issues.

Thanks!