cloudevents / sdk-go

Go SDK for CloudEvents
https://cloudevents.github.io/sdk-go/
Apache License 2.0
828 stars 218 forks source link

After a rebalance, the receiver keeps receiving messages until it errors out #817

Closed nbajaj90 closed 1 year ago

nbajaj90 commented 2 years ago

After a rebalance, the receiver keeps receiving messages until its error out, and most of these messages fails to commit. Do we need to fix the ConsumeClaim to return once the consumer session ends (because of rebalance), so that the receiver won't keep receiving messages after rebalance.

To fix here: https://github.com/cloudevents/sdk-go/blob/6784a56ad84051544d141e597a226e8e48cb8a98/protocol/kafka_sarama/v2/receiver.go#L55

References: https://github.com/Shopify/sarama/issues/2118 https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go

nbajaj90 commented 2 years ago

Expecting something like:

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for {
        select {
        case msg, ok := <-claim.Messages():
            if !ok {
                return nil
            }
            m := NewMessageFromConsumerMessage(msg)

            r.incoming <- msgErr{
                msg: binding.WithFinish(m, func(err error) {
                    if protocol.IsACK(err) {
                        session.MarkMessage(msg, "")
                    }
                }),
            }
        case <-session.Context().Done():
            return nil
        }
    }
}