IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

Does sarama Consume() will rebalance when all assigned claims has been comsumed #1844

Closed lockfu closed 1 year ago

lockfu commented 3 years ago
Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.27.0 V0_10_2_0 1.14.5
Configuration

type Kafka struct { brokers []string topics []string //OffsetNewest int64 = -1 //OffsetOldest int64 = -2 startOffset int64 version string ready chan bool group string channelBufferSize int }

func NewKafka() *Kafka { return &Kafka{ brokers: brokers, topics: []string{ topics, }, group: group, channelBufferSize: 2, ready: make(chan bool), //version:"1.1.1", } }

var brokers = []string{"xx","xx","xx"} var topics = "xx" var group = "xx"

func (p *Kafka) Init() func() { println("kafka init...")

//version, err := sarama.ParseKafkaVersion(p.version)
//if err != nil {
//  log.Fatalf("Error parsing Kafka version: %v", err)
//}
config := NewSamara.NewConfig()
config.Version = NewSamara.V0_10_2_0
config.Consumer.Group.Rebalance.Strategy = NewSamara.BalanceStrategyRange
config.Consumer.Offsets.Initial = -2                 
config.ChannelBufferSize = p.channelBufferSize 

ctx, cancel := context.WithCancel(context.Background())
client, err := NewSamara.NewConsumerGroup(p.brokers, p.group, config)
if err != nil {
    println("Error creating consumer group client: %v", err)
}

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
    defer func() {
        wg.Done()
        //util.HandlePanic("client.Consume panic", log.StandardLogger())
    }()
    for {
        if err := client.Consume(ctx, p.topics, p); err != nil {
            log.Fatalf("Error from consumer: %v", err)
        }
        // check if context was cancelled, signaling that the consumer should stop
        println(" ------------------ rebalance ----------")
        if ctx.Err() != nil {
            log.Println(ctx.Err())
            return
        }
        p.ready = make(chan bool)
    }
}()
<-p.ready
println("Sarama consumer up and running!...")
return func() {
    println("kafka close")
    cancel()
    wg.Wait()
    if err = client.Close(); err != nil {
        println("Error closing client: %v", err)
    }
}

}

// Setup is run at the beginning of a new session, before ConsumeClaim func (p *Kafka) Setup(NewSamara.ConsumerGroupSession) error { // Mark the consumer as ready close(p.ready) return nil }

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (p *Kafka) Cleanup(NewSamara.ConsumerGroupSession) error { return nil }

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (p *Kafka) ConsumeClaim(session NewSamara.ConsumerGroupSession, claim NewSamara.ConsumerGroupClaim) error {

// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
    //msg := string(message.Value)
    println("-------- consume message")
    time.Sleep(time.Second)
    //run.Run(msg)
    session.MarkMessage(message, "")
}
return nil

}

func main() { k := NewKafka() f := k.Init()

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
    println("terminating: via signal")
}
f()

} What configuration values are you using for Sarama and Kafka?

Logs

When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

logs: CLICK ME

``` ```

Problem Description

Hi all, i use sarama1.27.0 to consume topics, and it will consume again when all the claims consumed. so it's a rebalance? Does sarama do not polling claims from kafka. or just get some claims and consume it, and then get claims again with rebalance. thx.

ghost commented 3 years ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.