bsm / sarama-cluster

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]
MIT License
1.01k stars 222 forks source link

Kafka repeated consumption #293

Closed cafra closed 5 years ago

cafra commented 5 years ago

Hello, author. During the use, there is a problem of Kafka's repeated consumption, and the following is my use.

Kafka cluster is the version 1.0.0;

The timing of repeated consumption is usually when the client restarts. The irrelevant topic corresponds to the client restart, which will cause repeated consumption by other clients. It was previously thought that when the client was closed, the offset was not committed, so the signal was added and the commit was made when it was closed. But still have not solved the problem。

I look forward to your answer.

package kafka

import (
    "log"
    "os"
    "os/signal"
    "strings"

    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster" //support automatic consumer-group rebalancing and offset tracking
)

type Consumer struct {
    signals chan os.Signal
    cli     *cluster.Consumer
}
type Handler func(*sarama.ConsumerMessage) error

func NewConsumer(brokers, topics, group_id string) (consumer *Consumer, err error) {
    consumer = new(Consumer)
    groupID := group_id
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    // 重要!!!!!!!!!!!
    // OffsetNewest:pub 每次启动从队列的最新数据开始消费
    // OffsetOldest: pub 每次启动从队列上次消费的地方开始消费
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    consumer.cli, err = cluster.NewConsumer(strings.Split(brokers, ","), groupID, strings.Split(topics, ","), config)
    if err != nil {
        log.Printf("Failed open consumer: %v", err)
        return
    }

    consumer.signals = make(chan os.Signal, 1)
    signal.Notify(consumer.signals, os.Interrupt)

    go func(c *cluster.Consumer) {
        errors := c.Errors()
        noti := c.Notifications()
        for {
            select {
            case err := <-errors:
                log.Printf("Errors errrs %v", err)
            case <-noti:
                //log.Printf("Notifications errrs %v", tmp)
            }
        }
    }(consumer.cli)
    return
}

// Handler 错误则不commit.下次启动可在此消费
func (c *Consumer) Serve(h Handler) (err error) {
    for {
        select {
        case msg, ok := <-c.cli.Messages():
            if ok {
                if h(msg) != nil {
                    log.Printf("Consumer|Serve handler err=%v", err)
                }
                //注意!!!! 如果panic ,系统重启等,下次都会从上次最后一个数据消费。保证数据不丢失
                c.cli.MarkOffset(msg, "") // mark message as processed
            }
        case <-c.signals:
            c.cli.CommitOffsets()
            return
        }
    }
    return
}
dim commented 5 years ago

Hi, it's a known problem, hence https://github.com/bsm/sarama-cluster#deprecation-notice

cafra commented 5 years ago

Is the current library obsolete? But what is the reason for the repeated consumption? Is there any solution? The link description has been merged into Shopify/sarama. But I don't seem to find it. Is there a specific address, or a demo?