confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.62k stars 657 forks source link

Consumer much slower than producer #1307

Open bichselb opened 2 weeks ago

bichselb commented 2 weeks ago

Description

In my very simple implementation, I can easily produce 1 million messages per second, but can only consume around 100k messages per second. How can I speed up the consumer to be as fast as the producer? I would like to avoid parallelization, as this is also not needed for good producer performance.

Both my producer and consumer follow the README examples very closely.

How to reproduce

I put my whole experimental setup here: https://github.com/bichselb/confluent-kafka-go-performance/tree/slow

My consumer is a straight-forward adaptation of the README example:

package main

import (
    "fmt"
    "time"

    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
    start := time.Now()

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "kafka:29092",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    err = c.Subscribe("myTopic", nil)

    if err != nil {
        panic(err)
    }

    messagesPerBatch := 1_000_000
    nReceived := 0

    for {
        msg, err := c.ReadMessage(10 * time.Second)
        if err == nil {
            nReceived++
        } else if err.(kafka.Error).IsTimeout() {
            fmt.Printf("Timeout after %d messages\n", nReceived)
            break
        } else {
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }

        if (nReceived % messagesPerBatch == 0) && (nReceived > 0) {
            elapsed := time.Since(start)
            perSecond := int(float64(nReceived) / elapsed.Seconds())
            fmt.Printf("Received %d messages within %.2f seconds (average %d messages/sec)\n", nReceived, elapsed.Seconds(), perSecond)
        }
    }

    c.Close()
}

Checklist

Please provide the following information:

bichselb commented 1 week ago

I was able to make the consumer almost as fast as the producer by setting various configuration properties specified by librdkafka (most importantly fetch.message.max.bytes).

This is my fixed setup: https://github.com/bichselb/confluent-kafka-go-performance/tree/fast

OneCricketeer commented 4 days ago

Note - your start time us inclusive of the time to actually connect and configure the client, so it'll be higher than a true msg/s number

bichselb commented 3 days ago

Thanks @OneCricketeer for the hint. However, this is not the reason for the slowness (the client configuration time is negligible compared to sending 1 million messages).

bichselb commented 3 days ago

I am leaving this issue open in case someone knows how to close the remaining gap between the producer (~1 million messages/sec) and the consumer (~800k messages/sec).

Feel free to close if this remaining gap is expected.

OneCricketeer commented 2 days ago

Have you tried ZSTD compression? https://blog.cloudflare.com/squeezing-the-firehose/

Used tools like iperf3 to check max network speeds? Monitored cpu/memory/disk/IO rates while consuming?

There's not much information to go on, only from the shown code

Besides, Kafka has a builtin consumer perf script that'll use Java, but it'll still give you a theoretical limit of how fast you could consume.