wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

Bug? Consuming stops or hangs at certain time. #110

Open windless0530 opened 7 years ago

windless0530 commented 7 years ago

I have met a problem:

Code written using wvanbergen/kafka consumes messages successfully at first, but after receiving 872 messages, the consuming loop ended. I am sure all 872 messages are processed successfully by another goroutine.

Then I restarted my program, still no message could be consumed.

While at this time, on kafka manager, the consumer offset stops and never grows, while the log size and total lag num are constantly growing.

If I use kafka consuming test script kafka-console-consumer.sh, with the same zk/consumer group, all messages could be successfully consumed.

So I could just consider it as a bug...

BTW, the code is displayed as following (The "received %d msg" log printed at first but now never prints again, and the log "Message terminated." never ever printed even once):

BTW again, the same thing happened last week, and I stopped my program at that time. Today I ran my program again, at first everything was right, but then stopped again.

`consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config) if consumerErr != nil { log.Fatalln(consumerErr) }

go func() {
    for err := range consumer.Errors() {
        log.Printf("receive consumer error: %s\n", err)
        if consumer.Closed() {
            consumer, _ = consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config)
        }
    }
}()

eventCount := 0
offsets := make(map[string]map[int32]int64)
go func() {
    log.Infof("Consumer group: close = %t, %s", consumer.Closed(), consumer)
    for message := range consumer.Messages() {
        if offsets[message.Topic] == nil {
            offsets[message.Topic] = make(map[int32]int64)
        }
        eventCount += 1
        if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
            log.Printf("unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
        }

        log.Infof("received %d msg", eventCount)
        p.recordRecv()
        p.msgChan <- message.Value

        offsets[message.Topic][message.Partition] = message.Offset
        consumer.CommitUpto(message)
    }
    log.Infoln("Message terminated.")
}()`