IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.55k stars 1.76k forks source link

High GC pressure due to numerous temporary objects in high-concurrency consumption scenario #2951

Open xiaomudk opened 3 months ago

xiaomudk commented 3 months ago
Description

Hello Sarama maintainers, I'm encountering high GC pressure when consuming Kafka messages using Sarama under a 100K TPS high concurrency scenario.

pprof profile image

pprof heap

Type: alloc_objects
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top20
Showing nodes accounting for 345968905, 96.71% of 357746798 total
Dropped 298 nodes (cum <= 1788733)
Showing top 20 nodes out of 34
      flat  flat%   sum%        cum   cum%
 104992531 29.35% 29.35%  214004373 59.82%  github.com/IBM/sarama.(*MessageSet).decode
 101894004 28.48% 57.83%  101894004 28.48%  github.com/IBM/sarama.(*partitionConsumer).parseMessages
  96394846 26.94% 84.78%  211677820 59.17%  github.com/IBM/sarama.(*MessageBlock).decode
   5355012  1.50% 86.27%    5355012  1.50%  github.com/IBM/sarama.(*FetchRequest).AddBlock
   5266207  1.47% 87.74%  224329016 62.71%  github.com/IBM/sarama.(*FetchResponse).decode
   4128776  1.15% 88.90%    5161101  1.44%  github.com/eapache/go-xerial-snappy.DecodeInto
   3522638  0.98% 89.88%    3522638  0.98%  github.com/IBM/sarama.(*realDecoder).push
   3173296  0.89% 90.77%    3173296  0.89%  github.com/IBM/sarama.makeResponsePromise

After analyzing the pprof heap profile, I've identified several areas of concern related to frequent memory allocations during message decoding. Here are the key findings:

  1. in (*MessageSet).decode
func (ms *MessageSet) decode(pd packetDecoder) (err error) {
    ms.Messages = nil                                 // reset to nil

    for pd.remaining() > 0 {
        .....

        msb := new(MessageBlock)              // new MessageBlock objects are created for each message
        err = msb.decode(pd)
        if err == nil {
            ms.Messages = append(ms.Messages, msb)       // Append will dynamically expand
        } else if errors.Is(err, ErrInsufficientData) {
                         ...
        } else {
            return err
        }
    }

    return nil
}

Additionally, I noticed that the ms.Messages slice is reset to nil in each decode call. This approach might lead to unnecessary allocations and GC pressure. Would it be beneficial to reuse the existing slice instead of creating a new one each time? Also, is there a way to pre-allocate the slice with an estimated capacity to reduce the number of dynamic expansions?

  1. in (*MessageBlock).decode
func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
    ......

    msb.Msg = new(Message)                   // a new Message object is created for each message block
    if err = msb.Msg.decode(pd); err != nil {
        return err
    }

    if err = pd.pop(); err != nil {
        return err
    }

    return nil
}
  1. in (*partitionConsumer).parseMessages
func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
    var messages []*ConsumerMessage           // should pre-allocate the slice with an estimated capacity
    for _, msgBlock := range msgSet.Messages {
        for _, msg := range msgBlock.Messages() {
            .......
            messages = append(messages, &ConsumerMessage{          // new ConsumerMessage objects are created for each parsed message
                Topic:          child.topic,
                Partition:      child.partition,
                Key:            msg.Msg.Key,
                Value:          msg.Msg.Value,
                Offset:         offset,
                Timestamp:      timestamp,
                BlockTimestamp: msgBlock.Msg.Timestamp,
            })
            child.offset = offset + 1
        }
    }
    if len(messages) == 0 {
        child.offset++
    }
    return messages, nil
}

Are there any optimization strategies or best practices you could recommend to reduce these allocations and mitigate the GC pressure? I'm open to suggestions on how to improve the performance in this high-concurrency scenario. Thank you for your time and assistance.

Versions
Sarama Kafka Go
1.43.2 0.10.2 1.22
Configuration
    conf := sarama.NewConfig()
    conf.Version = sarama.V0_10_2_0
    conf.Consumer.Offsets.Initial = sarama.OffsetNewest
Logs
logs: CLICK ME

``` ```

Additional Context
github-actions[bot] commented 1 week ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.