wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

ConsumerGroup.Messages() #111

Open orange-jacky opened 7 years ago

orange-jacky commented 7 years ago

i create lots of ConsumerGroup with JoinConsumerGroup, and fetch msg from kafka by for event := range consumer.Messages() { // Process event log.Println(string(event.Value)) eventCount += 1

// Ack event
consumer.CommitUpto(event)

}

when i run my program, consumer.Messages() return nil. the fowllowing my code

`type KafkaConsumer struct { Topic string Kafka //kafka configure GroupConsumer consumergroup.ConsumerGroup Msgs chan sarama.ConsumerMessage //提供给外部程序使用 Exit chan bool }

func (k Kafka) NewConsumer(topic string) (*KafkaConsumer, error) {

config := consumergroup.NewConfig()
config.Offsets.Initial = Convert(k.Where)
config.Offsets.ProcessingTimeout = 10 * time.Second
var zookeeperNodes []string
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(topic)
k.Alltopic = append(k.Alltopic, topic)

groupconsumer, err := consumergroup.JoinConsumerGroup(k.Groupname, zookeeperNodes, k.Zookeeper, config)
if err != nil {
    return nil, err
}

msgchan := make(chan *sarama.ConsumerMessage, 30)

ret := &KafkaConsumer{Kafka: k, GroupConsumer: groupconsumer,
    Msgs: msgchan, Exit: make(chan bool, 1), Topic: topic}
log.Printf("new a zookeeper consumer, info:%s, topic:%s, groupconsumer:%+v\n", k.String(), topic, ret)
return ret, nil

}

func (k *KafkaConsumer) Close() error { if k.GroupConsumer != nil { if err := k.GroupConsumer.Close(); err != nil { log.Printf("stop a zookeeper consumer fail. err:%s hostinfo:%s\n", err.Error(), k.String()) return err } else { log.Printf("stop a zookeeper consumer success, hostinfo:%s\n", k.String()) } } k.Exit <- true return nil }

func (k *KafkaConsumer) String() string { return fmt.Sprintf("%s input_topic:%s, groupconsumer:%+v, msgchan=%+v", k.Kafka.String(), k.Topic, k.GroupConsumer, k.Msgs) }

func (k KafkaConsumer) Dispatcher(mylog Log) { ticker := time.NewTicker(time.Second * 10) log.Printf("topic:%s start a dispatcher\n", k.Topic) for { select { case msg := <-k.GroupConsumer.Messages(): if msg != nil && msg.Value != nil && len(msg.Value) > 0 { k.Msgs <- msg k.GroupConsumer.CommitUpto(msg) mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, msg_len=%d, send to recevie chan", k.Alltopic, msg.Topic, len(msg.Value)) } else { mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v,error format msg:%+v", k.Alltopic, k.Topic, msg) } case err := <-k.GroupConsumer.Errors(): mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, fetch msg err,%+v", k.Alltopic, k.Topic, err) case <-k.Exit: log.Printf("conumser_topic:%+v, msg_topic:%+v, dispatcher exit\n", k.Alltopic, k.Topic) ticker.Stop() return case <-ticker.C: mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, dispatcher ticker 10s, Consumer:%+v, chan_len:%d", k.Alltopic, k.Topic, k) } } }`

how to fix it? best wishes.

wvanbergen commented 7 years ago

Once you close the consumer, the Messages() and Errors() channels will be closed, which means they will start returning nil values continuously. You should break your loop when that happens.

orange-jacky commented 7 years ago

kafka has data without dealing, i break my loop, than i can't fetch data without deal, this don't suit for the act.

orange-jacky commented 7 years ago

# a.go file

import ( "fmt" "github.com/Shopify/sarama" "github.com/wvanbergen/kafka/consumergroup" "github.com/wvanbergen/kazoo-go" "log" "strings" "time" )

//kafka基本信息 type Kafka struct { Hosts []string Where string //从哪儿开始读取 Zookeeper []string //zookeeper主机 Groupname string //分组名 Alltopic []string }

func (k Kafka) String() string { return fmt.Sprintf("zookeeper=%s, groupname=%s, where=%s, alltopic=%s", strings.Join(k.Zookeeper, ","), k.Groupname, k.Where, strings.Join(k.Alltopic, ",")) }

//按组消费者基本信息 type KafkaConsumer struct { Topic string Kafka //kafka configure GroupConsumer consumergroup.ConsumerGroup Msgs chan sarama.ConsumerMessage //提供给外部程序使用 Exit chan bool }

func (k Kafka) NewConsumer(topic string) (*KafkaConsumer, error) {

config := consumergroup.NewConfig()
config.Offsets.Initial = Convert(k.Where)
config.Offsets.ProcessingTimeout = 10 * time.Second
var zookeeperNodes []string
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(topic)
k.Alltopic = append(k.Alltopic, topic)

groupconsumer, err := consumergroup.JoinConsumerGroup(k.Groupname, zookeeperNodes, k.Zookeeper, config)
if err != nil {
    return nil, err
}

msgchan := make(chan *sarama.ConsumerMessage, 30)

ret := &KafkaConsumer{Kafka: k, GroupConsumer: groupconsumer,
    Msgs: msgchan, Exit: make(chan bool, 1), Topic: topic}
log.Printf("new a zookeeper consumer, info:%s, topic:%s, groupconsumer:%+v\n", k.String(), topic, ret)
return ret, nil

}

func (k *KafkaConsumer) Close() error { if k.GroupConsumer != nil { if err := k.GroupConsumer.Close(); err != nil { log.Printf("stop a zookeeper consumer fail. err:%s hostinfo:%s\n", err.Error(), k.String()) return err } else { log.Printf("stop a zookeeper consumer success, hostinfo:%s\n", k.String()) } } k.Exit <- true return nil }

func (k *KafkaConsumer) String() string { return fmt.Sprintf("%s input_topic:%s, groupconsumer:%+v, msgchan=%+v", k.Kafka.String(), k.Topic, k.GroupConsumer, k.Msgs) }

func (k KafkaConsumer) Dispatcher(mylog Log) { ticker := time.NewTicker(time.Second * 10) log.Printf("topic:%s start a dispatcher\n", k.Topic) for { select { case msg := <-k.GroupConsumer.Messages(): if msg != nil && msg.Value != nil && len(msg.Value) > 0 { k.Msgs <- msg k.GroupConsumer.CommitUpto(msg) mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, msg_len=%d, send to recevie chan", k.Alltopic, msg.Topic, len(msg.Value)) } else { mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v,error format msg:%+v", k.Alltopic, k.Topic, msg) } case err := <-k.GroupConsumer.Errors(): mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, fetch msg err,%+v", k.Alltopic, k.Topic, err) case <-k.Exit: log.Printf("conumser_topic:%+v, msg_topic:%+v, dispatcher exit\n", k.Alltopic, k.Topic) ticker.Stop() return case <-ticker.C: mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, dispatcher ticker 10s, Consumer:%+v, chan_len:%d", k.Alltopic, k.Topic, k) } } }

//转换方法 func Convert(where string) int64 { var ret int64 switch where { case "begin": ret = sarama.OffsetOldest case "now": ret = sarama.OffsetNewest default: ret = sarama.OffsetNewest } return ret }

mylogic.go file

import ( "bufio" t "cf_build/thrid_party" "github.com/go-ini/ini" "os" "strings" )

type Conf struct { Inter_topics []string Inter_topics_new []string Inter_compact_topics []string Inter_qy_topics []string Inter_update_topic string

China_topic                string
China_update_topic         string
China_crawl_topics         []string
China_crawl_compact_topics []string

Islog_topic string
Iclog_topic string

}

type KafkaFlight struct { Conf Kafka t.Kafka I []t.KafkaConsumer INew []t.KafkaConsumer ICompact []t.KafkaConsumer IQy []t.KafkaConsumer IUpdate []t.KafkaConsumer C []t.KafkaConsumer CUpdate []t.KafkaConsumer CCrawl []t.KafkaConsumer CCrawlCompact []t.KafkaConsumer Islog []t.KafkaConsumer Iclog []t.KafkaConsumer Log t.Log }

//初始化从kafka中读取特价机票信息模块 func (k KafkaFlight) Init(inifile string, log t.Log) error { param, err := ini.Load(inifile) if err != nil { return err }

k.Log = log

//读取国际航线数据
inter_topics_file := param.Section("kafka").Key("inter_topics").String()
var inter_topics []string
f, _ := os.Open(inter_topics_file)
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
    topic := strings.TrimSpace(scanner.Text())
    inter_topics = append(inter_topics, topic)
}

inter_topics_new_file := param.Section("kafka").Key("inter_topic_new").String()
var inter_topics_new []string
f_new, _ := os.Open(inter_topics_new_file)
defer f_new.Close()
scanner_new := bufio.NewScanner(f_new)
for scanner_new.Scan() {
    topic := strings.TrimSpace(scanner_new.Text())
    inter_topics_new = append(inter_topics_new, topic)
}

inter_compact_topics_file := param.Section("kafka").Key("inter_compact_topics").String()
var inter_compact_topics []string
f22, _ := os.Open(inter_compact_topics_file)
defer f22.Close()
scanner22 := bufio.NewScanner(f22)
for scanner22.Scan() {
    topic := strings.TrimSpace(scanner22.Text())
    inter_compact_topics = append(inter_compact_topics, topic)
}

inter_qy_topics_file := param.Section("kafka").Key("inter_qy_topic").String()
var inter_qy_topics []string
f_qy, _ := os.Open(inter_qy_topics_file)
defer f_qy.Close()
scanner_qy := bufio.NewScanner(f_qy)
for scanner_qy.Scan() {
    topic := strings.TrimSpace(scanner_qy.Text())
    inter_qy_topics = append(inter_qy_topics, topic)
}

inter_update_topic := param.Section("kafka").Key("inter_update_topic").String()
inter_update_topic = strings.TrimSpace(inter_update_topic)

//读取国内航线数据
china_crawl_topic_file := param.Section("kafka").Key("china_crawl_topic").String()
var china_crawl_topic []string
f1, _ := os.Open(china_crawl_topic_file)
defer f1.Close()
scanner1 := bufio.NewScanner(f1)
for scanner1.Scan() {
    topic := strings.TrimSpace(scanner1.Text())
    china_crawl_topic = append(china_crawl_topic, topic)
}

china_crawl_compact_topic_file := param.Section("kafka").Key("china_crawl_compact_topic").String()
var china_crawl_compact_topic []string
f11, _ := os.Open(china_crawl_compact_topic_file)
defer f11.Close()
scanner11 := bufio.NewScanner(f11)
for scanner11.Scan() {
    topic := strings.TrimSpace(scanner11.Text())
    china_crawl_compact_topic = append(china_crawl_compact_topic, topic)
}
china_topic := param.Section("kafka").Key("china_topic").String()
china_update_topic := param.Section("kafka").Key("china_update_topic").String()
china_topic = strings.TrimSpace(china_topic)
china_update_topic = strings.TrimSpace(china_update_topic)

//读取更新日志信息
islog_topic := param.Section("kafka").Key("islog_topic").String()
iclog_topic := param.Section("kafka").Key("iclog_topic").String()
islog_topic = strings.TrimSpace(islog_topic)
iclog_topic = strings.TrimSpace(iclog_topic)

//配置conf
k.Conf.Inter_topics = inter_topics
k.Conf.Inter_topics_new = inter_topics_new
k.Conf.Inter_compact_topics = inter_compact_topics
k.Conf.Inter_qy_topics = inter_qy_topics
k.Conf.Inter_update_topic = inter_update_topic
k.Conf.China_topic = china_topic
k.Conf.China_update_topic = china_update_topic
k.Conf.China_crawl_topics = china_crawl_topic
k.Conf.China_crawl_compact_topics = china_crawl_compact_topic
k.Conf.Islog_topic = islog_topic
k.Conf.Iclog_topic = iclog_topic

//读取kafka主机信息
kafka_hosts := param.Section("kafka").Key("hosts").String()
sli := strings.Split(kafka_hosts, ",")
for index, _ := range sli {
    sli[index] = strings.TrimSpace(sli[index])
}
kafka_where := param.Section("kafka").Key("where").String()

kafka_zk := param.Section("kafka").Key("zookeepers").String()
zk_sli := strings.Split(kafka_zk, ",")
for index, _ := range zk_sli {
    zk_sli[index] = strings.TrimSpace(zk_sli[index])
}
kafka_groupname := param.Section("kafka").Key("groupname").String()

k.Kafka = t.Kafka{Hosts: sli, Where: kafka_where, Zookeeper: zk_sli,
    Groupname: kafka_groupname}

return nil

}

//启动模块 func (k *KafkaFlight) Start() error { //启动分组消费 for _, v := range k.Conf.Inter_topics { kafka := k.Kafka if groupconsumer, err := kafka.NewConsumer(v); err == nil { k.I = append(k.I, groupconsumer) go groupconsumer.Dispatcher(k.Log) } }

for _, v := range k.Conf.Inter_topics_new {
    kafka := k.Kafka
    if groupconsumer, err := kafka.NewConsumer(v); err == nil {
        k.INew = append(k.INew, groupconsumer)
        go groupconsumer.Dispatcher(k.Log)
    }
}

for _, v := range k.Conf.Inter_compact_topics {
    kafka := k.Kafka
    if groupconsumer, err := kafka.NewConsumer(v); err == nil {
        k.ICompact = append(k.ICompact, groupconsumer)
        go groupconsumer.Dispatcher(k.Log)
    }
}
for _, v := range k.Conf.Inter_qy_topics {
    kafka := k.Kafka
    if groupconsumer, err := kafka.NewConsumer(v); err == nil {
        k.IQy = append(k.IQy, groupconsumer)
        go groupconsumer.Dispatcher(k.Log)
    }
}

kafka1 := k.Kafka
if groupconsumer, err := kafka1.NewConsumer(k.Conf.Inter_update_topic); err == nil {
    k.IUpdate = append(k.IUpdate, groupconsumer)
    go groupconsumer.Dispatcher(k.Log)
}

kafka2 := k.Kafka
if groupconsumer, err := kafka2.NewConsumer(k.Conf.China_topic); err == nil {
    k.C = append(k.C, groupconsumer)
    go groupconsumer.Dispatcher(k.Log)
}

kafka3 := k.Kafka
if groupconsumer, err := kafka3.NewConsumer(k.Conf.China_update_topic); err == nil {
    k.CUpdate = append(k.CUpdate, groupconsumer)
    go groupconsumer.Dispatcher(k.Log)
}

for _, v := range k.Conf.China_crawl_topics {
    kafka := k.Kafka
    if groupconsumer, err := kafka.NewConsumer(v); err == nil {
        k.CCrawl = append(k.CCrawl, groupconsumer)
        go groupconsumer.Dispatcher(k.Log)
    }
}

for _, v := range k.Conf.China_crawl_compact_topics {
    kafka := k.Kafka
    if groupconsumer, err := kafka.NewConsumer(v); err == nil {
        k.CCrawlCompact = append(k.CCrawlCompact, groupconsumer)
        go groupconsumer.Dispatcher(k.Log)
    }
}
kafka4 := k.Kafka
if groupconsumer, err := kafka4.NewConsumer(k.Conf.Islog_topic); err == nil {
    k.Islog = append(k.Islog, groupconsumer)
    go groupconsumer.Dispatcher(k.Log)
}

kafka5 := k.Kafka
if groupconsumer, err := kafka5.NewConsumer(k.Conf.Iclog_topic); err == nil {
    k.Iclog = append(k.Iclog, groupconsumer)
    go groupconsumer.Dispatcher(k.Log)
}
return nil

}

//停止模块 func (k *KafkaFlight) Stop() { //停止topic消费 for , v := range k.I { v.Close() } for , v := range k.INew { v.Close() } for , v := range k.ICompact { v.Close() } for , v := range k.IQy { v.Close() } for , v := range k.IUpdate { v.Close() } for , v := range k.C { v.Close() } for , v := range k.CUpdate { v.Close() } for , v := range k.CCrawl { v.Close() } for , v := range k.CCrawlCompact { v.Close() } for , v := range k.Islog { v.Close() } for _, v := range k.Iclog { v.Close() } }

when my program start, i start KafkaFlight; and stop KafkaFlight when my program stop. i need to fetch data from kafka continuely, when producer produce data to kafka. the Message() close and returen nil. how can this suit for me ? thanks. best wishes.