wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

Failure to use select instead of range #73

Closed echupriyanov closed 9 years ago

echupriyanov commented 9 years ago

Hello,

I'm trying to write small consumer group application using several workers. Almost everything is working correctly, but when in supplied example (consumergroup.go) I try to change logic from:

    for message := range consumer.Messages() {
        if offsets[message.Topic] == nil {
            offsets[message.Topic] = make(map[int32]int64)
        }

to:

    for {
        select {
        case message := <-consumer.Messages():
            if offsets[message.Topic] == nil {
                offsets[message.Topic] = make(map[int32]int64)
            }

When I try to stop worker with ^C, I'm getting nil pointer dereference:

[Sarama] 2015/08/16 16:59:58 [consumer_example.go/4932df2e5698] ip-rep-input :: Stopped topic consumer
[Sarama] 2015/08/16 16:59:58 [consumer_example.go/4932df2e5698] Deregistered consumer instance Greyhound.local:9b307c47-696a-4419-9b93-4932df2e5698.
[Sarama] 2015/08/16 16:59:58 Closing Client
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x2d9d]

goroutine 1 [running]:
main.main()
    /Users/eric/sync/Work/go/src/cybertonica.com/irb/cg/cg.go:75 +0xbcd

What am I doing wrong?

The full modified source code is:

package main

import (
    "flag"
    "log"
    "os"
    "os/signal"
    "strings"
    "time"

    "github.com/Shopify/sarama"
    "github.com/wvanbergen/kafka/consumergroup"
    "github.com/wvanbergen/kazoo-go"
)

const (
    defaultKafkaTopics   = "test_topic"
    defaultConsumerGroup = "consumer_example.go"
)

var (
    consumerGroup  = flag.String("group", defaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
    kafkaTopicsCSV = flag.String("topics", defaultKafkaTopics, "The comma-separated list of topics to consume")
    zookeeper      = flag.String("zookeeper", "", "A comma-separated Zookeeper connection string (e.g. `zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181`)")

    zookeeperNodes []string
)

func init() {
    sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}

func main() {
    flag.Parse()

    if *zookeeper == "" {
        flag.PrintDefaults()
        os.Exit(1)
    }

    config := consumergroup.NewConfig()
    config.Offsets.Initial = sarama.OffsetNewest
    config.Offsets.ProcessingTimeout = 10 * time.Second

    zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)

    kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")

    consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
    if consumerErr != nil {
        log.Fatalln(consumerErr)
    }

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    go func() {
        <-c
        if err := consumer.Close(); err != nil {
            sarama.Logger.Println("Error closing the consumer", err)
        }
    }()

    go func() {
        for err := range consumer.Errors() {
            log.Println(err)
        }
    }()

    eventCount := 0
    offsets := make(map[string]map[int32]int64)

    for {
        select {
        case message := <-consumer.Messages():
            if offsets[message.Topic] == nil {
                offsets[message.Topic] = make(map[int32]int64)
            }

            eventCount++
            if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
                log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
            }

            // Simulate processing time
            time.Sleep(10 * time.Millisecond)

            offsets[message.Topic][message.Partition] = message.Offset
            consumer.CommitUpto(message)
        }
    }

    log.Printf("Processed %d events.", eventCount)
    log.Printf("%+v", offsets)
}
wvanbergen commented 9 years ago

The problem is, once you interrupt the consumer, the Messages() channel will be closed eventually. Once this channel is closed, trying to read from it will always immediately return a zero-value. The range statement handles this, but if you use select you have to do this yourself.

You can do something like this to detect this:

    for {
        select {
        case message, ok := <-consumer.Messages():
          if !ok { // channel is closed
            return
          }
    ...
echupriyanov commented 9 years ago

Thanks! What was the issue. I didn't expect select was still trying to read after I hit ^C.