wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

Can't receive messages when i test the demo which the project gives. #126

Closed runnerliu closed 6 years ago

runnerliu commented 6 years ago

Here's my code. `package main

import ( "flag" "log" "os" "os/signal" "strings" "time"

"fmt"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"github.com/wvanbergen/kazoo-go"

)

const ( DefaultKafkaTopics = "kafka_performance_test" DefaultConsumerGroup = "testsarama1" MAX_COUNT = 1000000 )

var ( consumerGroup = flag.String("group", DefaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing") kafkaTopicsCSV = flag.String("topics", DefaultKafkaTopics, "The comma-separated list of topics to consume") zookeeper = flag.String("zookeeper", "openlive-kafka-online001-bjlt.qiyi.virtual:2181,openlive-kafka-online002-bjlt.qiyi.virtual:2181,openlive-kafka-online003-bjlt.qiyi.virtual:2181,openlive-kafka-online004-bjlt.qiyi.virtual:2181,openlive-kafka-online005-bjlt.qiyi.virtual:2181", "A comma-separated Zookeeper connection string (e.g. zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181)") zookeeperNodes []string )

func init() { sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags) }

func main() { flag.Parse()

if *zookeeper == "" {
    flag.PrintDefaults()
    os.Exit(1)
}

config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second

zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)

kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")

consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
if consumerErr != nil {
    log.Fatalln(consumerErr)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
    <-c
    if err := consumer.Close(); err != nil {
        sarama.Logger.Println("Error closing the consumer", err)
    }
}()

go func() {
    for err := range consumer.Errors() {
        log.Println(err)
    }
}()

eventCount := 0
offsets := make(map[string]map[int32]int64)
start_time := time.Now().UnixNano() / 1000000

fmt.Println(len(consumer.Messages()))

for message := range consumer.Messages() {

    fmt.Println(string(message.Value))
    if offsets[message.Topic] == nil {
        offsets[message.Topic] = make(map[int32]int64)
    }

    if eventCount > MAX_COUNT {
        break
    }

    fmt.Println("eventCount:", eventCount)
    eventCount += 1

    if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
        log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
    }

    // Simulate processing time
    // time.Sleep(10 * time.Millisecond)

    offsets[message.Topic][message.Partition] = message.Offset
    consumer.CommitUpto(message)
}

fmt.Println("total_count: ", eventCount)
end_time := time.Now().UnixNano() / 1000000
process_time := end_time - start_time
fmt.Println("process_time: ", process_time)

} and my output follows: 2018/01/09 17:15:05 Connected to 10.13.44.23:2181 2018/01/09 17:15:05 Authenticated: id=170991064662003625, timeout=6000 [Sarama] 2018/01/09 17:15:05 Initializing new client [Sarama] 2018/01/09 17:15:05 client/metadata fetching metadata for all topics from broker openlive-kafka-online005-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 Connected to broker at openlive-kafka-online005-bjlt.qiyi.virtual:9092 (unregistered) [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #5 at openlive-kafka-online005-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #10 at openlive-kafka-online014-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #1 at openlive-kafka-online001-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #6 at openlive-kafka-online010-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #9 at openlive-kafka-online013-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #2 at openlive-kafka-online002-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #7 at openlive-kafka-online011-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #3 at openlive-kafka-online003-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #8 at openlive-kafka-online012-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #4 at openlive-kafka-online004-bjlt.qiyi.virtual:9092 [Sarama] 2018/01/09 17:15:05 Successfully initialized new client [Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] Consumer instance registered (hcdn-others-worker-dev100-bjlt.qiyi.virtual:90c7388b-c001-45bc-8dc9-d2a251a23aba). len(consumer.Messages()): 0 [Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] Currently registered consumers: 1 [Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] kafka_performance_test :: Started topic consumer [Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] kafka_performance_test :: Claiming 10 of 10 partitions [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/1 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/6 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online005-bjlt.qiyi.virtual:9092 (registered as #5) [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/0 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/3 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online014-bjlt.qiyi.virtual:9092 (registered as #10) [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online004-bjlt.qiyi.virtual:9092 (registered as #4) [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/8 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 consumer/broker/5 added subscription to kafka_performance_test/1 [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/9 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online002-bjlt.qiyi.virtual:9092 (registered as #2) [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online011-bjlt.qiyi.virtual:9092 (registered as #7) [Sarama] 2018/01/09 17:15:06 consumer/broker/10 added subscription to kafka_performance_test/6 [Sarama] 2018/01/09 17:15:06 consumer/broker/4 added subscription to kafka_performance_test/0 [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online003-bjlt.qiyi.virtual:9092 (registered as #3) [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/5 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 consumer/broker/2 added subscription to kafka_performance_test/8 [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/4 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 consumer/broker/7 added subscription to kafka_performance_test/3 [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/7 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online013-bjlt.qiyi.virtual:9092 (registered as #9) [Sarama] 2018/01/09 17:15:06 consumer/broker/3 added subscription to kafka_performance_test/9 [Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/2 :: Partition consumer listening for new messages only. [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online001-bjlt.qiyi.virtual:9092 (registered as #1) [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online012-bjlt.qiyi.virtual:9092 (registered as #8) [Sarama] 2018/01/09 17:15:06 consumer/broker/9 added subscription to kafka_performance_test/5 [Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online010-bjlt.qiyi.virtual:9092 (registered as #6) [Sarama] 2018/01/09 17:15:06 consumer/broker/1 added subscription to kafka_performance_test/7 [Sarama] 2018/01/09 17:15:06 consumer/broker/8 added subscription to kafka_performance_test/4 [Sarama] 2018/01/09 17:15:06 consumer/broker/6 added subscription to kafka_performance_test/2`

You can see i received anything from kafka, and the length of consumer.Messages() is 0, i don't know the mistakes where i take,please do me a favour. Thanks!

And i find that if i change a new Consumergroup , i encounter the following mistake, but i don't know how to fix it.

[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Consumergroup testsarama3 does not yet exists, creating... [Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Consumer instance registered (hcdn-others-worker-dev100-bjlt.qiyi.virtual:0cfbc8c2-5203-4d3d-8283-77ab52d3afbb). len(consumer.Messages()): 0 [Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Currently registered consumers: 1 [Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] kafka_performance_test :: Started topic consumer [Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] kafka_performance_test :: Claiming 10 of 10 partitions [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/5 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/3 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/7 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/6 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/2 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/8 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/1 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/9 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists [Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/4 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists

wvanbergen commented 6 years ago

Is the topic seeing any live traffic? Your config is requesting for new messages only, so any existing messages when the consumer started will not be returned.

runnerliu commented 6 years ago

ops!I set the value of config.Offsets.Initial to sarama.OffsetOldest, and my code works!Thanks a lot!

runnerliu commented 6 years ago

Emm... I have another question to ask, does the lib consumer balanced? I tried to make some tests to verify my doubt, but i'm not have a conclusion yet, Could you tell me? thanks! @wvanbergen

wvanbergen commented 6 years ago

Yes, it balances the partitions over all running instances, and rebalances when one goes away or gets added.