IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

Sarama batch consuming mode leads the 100% usage of CPU #1972

Closed xue610315921 closed 1 year ago

xue610315921 commented 3 years ago
github.com/Shopify/sarama v1.27.1
Versions
Sarama Kafka Go
v1.27.1 v1.0 1.15
Configuration
saramaConfig.Version = sarama.V1_0_0_0
saramaConfig.Consumer.Return.Errors = true

saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = xxxx
saramaConfig.Net.SASL.Password = xxxx
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512

saramaClient, err := sarama.NewClient(brokers, saramaConfig)
saramaConsumer, err := sarama.NewConsumerGroupFromClient(group, saramaClient)
Logs
logs: CLICK ME

``` no error log ```

Problem Description

Is this the correct way to implement the batch consuming? The code here:

func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error) {
    rm = ratelimit.New(c.config.QpsLimitationPerPartition)
        msgArray := make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber)
    for {
        select {
        case msg := <-claim.Messages():
            {
                if msg == nil {
                    continue
                }

                rm.Take()
                msgArray = append(msgArray, msg)

                if len(msgArray) < c.config.BatchNumber {
                    continue // not reach the batch number, continue to read from buffer
                }

                go c.executeBatchConsumeFunc(sess, msgArray) // pass last msg for batch commit
                msgArray = make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber) // make new slice
            }
        default:
            {
                if len(msgArray) != 0 {
                    // control goroutine num
                    _, ok := <-c.channel
                    if !ok { //channel maybe closed
                        return
                    }

                    go c.executeBatchConsumeFunc(sess, msgArray) // pass last msg for batch commit
                    msgArray = make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber) // make new slice
                }
                // no new msg from channel, waiting for a period
                time.Sleep(time.Millisecond * time.Duration(c.config.BatchTimeInterval))
            }
        }
    }
}

Even I settled the rate limit to 50, the CPU usage of instance(docker) still would be 100%. Here is the pprof, I don't know why so many chanrecv here, and the lock & unlock in it cost a lot of resources.

271624210345_ pic_hd

In my memory when I was debugging, seems that I received a lot of nil from the channel(clainm.Message()), will it lead this? because I just continue when receiving nil. But I can't reproduce it on debugging machine now.

Thanks a lot!

fxrlv commented 3 years ago

sometimes kafka decides to rebalance a consumer group doc says that in this case message channel will be closed closed channel return nil

bingoabs commented 2 years ago

@fxrlv so we need to restart the consumer manually or not? Thanks.

fxrlv commented 2 years ago

if you get nil from the message channel, you should return from ConsumeClaim

then check returned error from Consume method you called before

nil error means you can recall Consume and get new claims

github-actions[bot] commented 1 year ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

i.e., https://github.com/IBM/sarama/blob/dedd86d84f2d56a7c3b604ea36da479c3d2a80bb/examples/consumergroup/main.go#L180-L204