IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

How to control the message consumption #1938

Closed raulnegreiros closed 1 year ago

raulnegreiros commented 3 years ago

Is there a way to control the consumption flow? I know that the Kafka Java API has the pause and resume methods, but I don't find something similar with Sarama. Which is the recommended way to pause the consumption while the processing mechanism has some problem (some database connection issue, for example)?

dnwe commented 3 years ago

When you call Consume on your consumer group instance, you pass a parent context.Context which will be passed through to the session and can be used to interrupt it and stop consumption: https://github.com/Shopify/sarama/blob/83d633e6e4f71b402df5e9c53ad5c1c334b7065d/consumer_group.go#L44

So you'd cancel that context to break out of the existing consumption and not call client.Consume(...) again until you are ready to re-join the group and consume again.

raulnegreiros commented 3 years ago

thanks for the clarification @dnwe

dnwe commented 3 years ago

Glad I could help!

raulnegreiros commented 3 years ago

Hi @dnwe , I just have modified the consumer group example to fit the described behaviour, but seems the partition rebalance is triggered, maybe it blocks the poll call to the broker or maybe I can be misunderstood the steps.

Click to show the code ```golang package main // SIGUSR1 toggle the pause/resume consumption import ( "context" "flag" "fmt" "log" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/Shopify/sarama" ) type ConsumptionControl struct { isAllowed bool controller sync.Cond cancelFunc context.CancelFunc } // Sarama configuration options var ( brokers = "" version = "" group = "" topics = "" assignor = "" oldest = true verbose = false ) func init() { flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list") flag.StringVar(&group, "group", "", "Kafka consumer group definition") flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version") flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list") flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)") flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest") flag.BoolVar(&verbose, "verbose", false, "Sarama logging") flag.Parse() if len(brokers) == 0 { panic("no Kafka bootstrap brokers defined, please set the -brokers flag") } if len(topics) == 0 { panic("no topics given to be consumed, please set the -topics flag") } if len(group) == 0 { panic("no Kafka consumer group defined, please set the -group flag") } } func main() { keepRunning := true log.Println("Starting a new Sarama consumer") if verbose { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } version, err := sarama.ParseKafkaVersion(version) if err != nil { log.Panicf("Error parsing Kafka version: %v", err) } /** * Construct a new Sarama configuration. * The Kafka cluster version has to be defined before the consumer/producer is initialized. */ config := sarama.NewConfig() config.Version = version switch assignor { case "sticky": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky case "roundrobin": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin case "range": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } if oldest { config.Consumer.Offsets.Initial = sarama.OffsetOldest } /** * Setup a new Sarama consumer group */ consumer := Consumer{ ready: make(chan bool), } ctx, cancelConsumption := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) if err != nil { log.Panicf("Error creating consumer group client: %v", err) } go func() { for err := range client.Errors() { fmt.Println("Error: ", err) } fmt.Println("<<<<<<<<<<<<<<<<<<<<<<<<< exiting error routine ") }() consumptionController := &ConsumptionControl{ isAllowed: true, controller: *sync.NewCond(&sync.Mutex{}), cancelFunc: cancelConsumption, } wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for keepRunning { fmt.Println(">>>>> before consume") // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil { log.Panicf("Error from consumer: %v", err) } fmt.Println("<<<<< after consume") // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { fmt.Println("ctx err:", ctx.Err().Error()) return } fmt.Println("checking consumption allowance") consumptionController.controller.L.Lock() for !consumptionController.isAllowed { fmt.Println("[!!!] pausing the consumption") consumptionController.controller.Wait() fmt.Println("[!!!] resuming the consumption") } consumptionController.controller.L.Unlock() consumer.ready = make(chan bool) } }() <-consumer.ready // Await till the consumer has been set up log.Println("Sarama consumer up and running!...") sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) sigusr1 := make(chan os.Signal, 1) signal.Notify(sigusr1, syscall.SIGUSR1) for keepRunning { select { case <-ctx.Done(): log.Println("terminating: context cancelled") keepRunning = false case <-sigterm: log.Println("terminating: via signal") keepRunning = false case <-sigusr1: ctx = toggleConsumptionFlow(ctx, consumptionController) } } cancelConsumption() // free consumer routine if it is blocked by the consumption control consumptionController.controller.L.Lock() consumptionController.isAllowed = true consumptionController.controller.Broadcast() consumptionController.controller.L.Unlock() fmt.Println("waiting consumer") wg.Wait() if err = client.Close(); err != nil { log.Panicf("Error closing client: %v", err) } } func toggleConsumptionFlow(ctx context.Context, consumptionController *ConsumptionControl) context.Context { consumptionController.controller.L.Lock() defer consumptionController.controller.L.Unlock() if consumptionController.isAllowed { consumptionController.isAllowed = !consumptionController.isAllowed consumptionController.cancelFunc() ctx, consumptionController.cancelFunc = context.WithCancel(context.Background()) } else { consumptionController.isAllowed = !consumptionController.isAllowed consumptionController.controller.Broadcast() } fmt.Println("toggled consumption to ", consumptionController.isAllowed) return ctx } // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error { fmt.Println("### SETUP. Claims:", session.Claims()) // Mark the consumer as ready close(consumer.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *Consumer) Cleanup(session sarama.ConsumerGroupSession) error { fmt.Println("### CLEANUP. Claims:", session.Claims()) return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { fmt.Println(">>>>> consume claim", claim.Topic(), claim.Partition()) // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { log.Printf("Message claimed[t: %s p: %d, o: %d]: key = %s value = %s, timestamp = %v", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value), message.Timestamp) // simulate the processing time time.Sleep(time.Second) log.Printf("Processed[t: %s p: %d, o: %d]", message.Topic, message.Partition, message.Offset) session.MarkMessage(message, "") } fmt.Println("<<<<< consume claim", claim.Topic(), claim.Partition()) return nil } ```
nikolaydimitrov commented 3 years ago

On top of what @raulnegreiros said, I think in Java it is also possible to pause consumption only for some partition(s). With the suggestion above, you stop consumption of all partitions.

nikolaydimitrov commented 3 years ago

Or this is an alternative: https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html#Consumer.Pause

github-actions[bot] commented 1 year ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

Resolved by #2005 and available since v1.31.1