confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.65k stars 659 forks source link

Consumer 10x times slower than kafkacat #490

Open lexasoft123 opened 4 years ago

lexasoft123 commented 4 years ago

Description

@edenhill please take a look. I am implementing a consumer for a large topic with very small messages (20-40 bytes). If I run kafkacat to consume from the topic I get 600 mbits/s throughput. If I process it from Go consumer, I get 60 mbit/s throughput.

Setting fetch.message.max.bytes=100000000 and queued.min.messages=10000000 gives spikes when consuming to 500 mbits/s and then pauses.

Seems that the library consumes well but the overhead in Go is very big when working with such small messages.

Using parallel consuming is not an option — there is only 1 partition and messages must be consequent.

How to reproduce

Code is simple:

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "",
        "group.id":           "test-group",
        "auto.offset.reset":  "earliest",
        "enable.auto.commit": false,
    })
    if err != nil {
        panic(err)
    }
    c.SubscribeTopics([]string{"topic"}, nil)
    for {
        msg, err := c.ReadMessage(time.Second * 10)
        if err != nil {
            log.Println("Read Error:", err)
            continue
        }
        receivedMessages++

        if receivedMessages%1000000 == 0 {
            fmt.Printf("Received %d messages, last message time is: %s\n", receivedMessages, msg.Timestamp.Format(time.RFC3339))
        }
    }

Checklist

Please provide the following information:

edenhill commented 4 years ago

CGo, Go's glue between C and Go code, unfortunately has a very high per-C-call overhead/cost (it treats all C calls, even ones that don't block, as if they could block and thus saves the current Go stack, et.al), so that is probably what you are seeing.

One way this can be alleviated is using the batch consumer interface, but it comes with its own problem of outdated messages (after rebalance, etc). If you dont need balanced consumer groups you could use Assign() instead of Subscribe(), in which case there will be no rebalances and the batch consumer interface can be used instead (either the channel or function based).

lexasoft123 commented 4 years ago

@edenhill thank you for your answer. I've tried Python client, same speed problem. Seems that this kind of data is ultimately ineffective for consumers based on C interface with rdkafka.

BTW, CPU is not a bottleneck, I see 30-50% consumption.

(pprof) top
Showing nodes accounting for 1080ms, 76.60% of 1410ms total
Showing top 10 nodes out of 106
      flat  flat%   sum%        cum   cum%
     480ms 34.04% 34.04%      550ms 39.01%  runtime.cgocall
     320ms 22.70% 56.74%      320ms 22.70%  runtime.futex
      40ms  2.84% 59.57%       40ms  2.84%  runtime.casgstatus
      40ms  2.84% 62.41%      180ms 12.77%  runtime.mallocgc
      40ms  2.84% 65.25%       40ms  2.84%  runtime.memclrNoHeapPointers
      40ms  2.84% 68.09%       50ms  3.55%  time.now
      30ms  2.13% 70.21%      180ms 12.77%  gopkg.in/confluentinc/confluent-kafka-go.v1/kafka.(*handle).newMessageFromFcMsg
      30ms  2.13% 72.34%       60ms  4.26%  runtime.cgoCheckArg
      30ms  2.13% 74.47%       50ms  3.55%  runtime.heapBitsSetType
      30ms  2.13% 76.60%       30ms  2.13%  runtime.nextFreeFast

Are you going to do something with this problem?

edenhill commented 4 years ago

Python's slowness lies in its object creation.

Use rdkafka_performance from the librdkafka source tree to get a baseline for how fast you can consume from your cluster, then use that number to compare with Go and Python clients.

lexasoft123 commented 4 years ago

@edenhill Here is it:

~/rdkafka/librdkafka-1.4.2/examples$ ./rdkafka_performance -C -G perf -t topic -p 0 -b b-1.kafka
% Waiting for group rebalance..
% 0 messages (0 bytes) consumed in 0ms: 0 msgs/s (0.00 MB/s)
% 0 messages (0 bytes) consumed in 0ms: 0 msgs/s (0.00 MB/s)
% 0 messages (0 bytes) consumed in 0ms: 0 msgs/s (0.00 MB/s)
% 0 messages (0 bytes) consumed in 0ms: 0 msgs/s (0.00 MB/s)
% 0 messages (0 bytes) consumed in 0ms: 0 msgs/s (0.00 MB/s)
% 0 messages (0 bytes) consumed in 0ms: 0 msgs/s (0.00 MB/s)
% Group rebalanced: 1 partition(s) assigned
% 979435 messages (30752934 bytes) consumed in 1000ms: 979434 msgs/s (30.75 MB/s)
% 1964445 messages (61645872 bytes) consumed in 2000ms: 982222 msgs/s (30.82 MB/s)
% 2935212 messages (92166382 bytes) consumed in 3000ms: 978403 msgs/s (30.72 MB/s)
% 3910789 messages (122813417 bytes) consumed in 4000ms: 977696 msgs/s (30.70 MB/s)
% 4862988 messages (152711904 bytes) consumed in 5000ms: 972596 msgs/s (30.54 MB/s)
% 5832513 messages (183155065 bytes) consumed in 6000ms: 972084 msgs/s (30.53 MB/s)
% 6812336 messages (213931535 bytes) consumed in 7002ms: 972793 msgs/s (30.55 MB/s)
% 7790386 messages (244593041 bytes) consumed in 8002ms: 973450 msgs/s (30.56 MB/s)
% 8769698 messages (275324477 bytes) consumed in 9002ms: 974101 msgs/s (30.58 MB/s)
% 9749008 messages (306204907 bytes) consumed in 10002ms: 974621 msgs/s (30.61 MB/s)
% 10732721 messages (337122898 bytes) consumed in 11002ms: 975448 msgs/s (30.64 MB/s)
% 11712734 messages (367880989 bytes) consumed in 12002ms: 975828 msgs/s (30.65 MB/s)
% 12699545 messages (398872128 bytes) consumed in 13008ms: 976283 msgs/s (30.66 MB/s)
% 13681573 messages (429712551 bytes) consumed in 14008ms: 976693 msgs/s (30.68 MB/s)
^C% Group rebalanced: 1 partition(s) revoked
% 13793460 messages (433223217 bytes) consumed in 14124ms: 976555 msgs/s (30.67 MB/s)
% Average application fetch latency: 1us

So, general rdkafka consuming speed is great as I said in the first post. Is there any possibility to achieve such speed in language bindings?

lexasoft123 commented 4 years ago

@edenhill do you have any ideas how to speed up Go consumer?

edenhill commented 4 years ago

Did you try the batch consume interface?

lexasoft123 commented 4 years ago

@edenhill I've just tried it.

    topic1 := "topic"
    err = c.Assign([]kafka.TopicPartition{{Topic: &topic1, Partition: 0}})

The same consuming speed — around 50-60 Mbit/s (instead of 30 MB/s as rdkafka perf test does).

edenhill commented 4 years ago

Use the channel-based consumer (which does batching internally) and see if it improves throughput: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_channel_example/consumer_channel_example.go

nit: Using the same unit for comparisons helps, either Mbit or MB.

lexasoft123 commented 4 years ago

@edenhill I've combined Assign call with the channel reading code from example. The same speed (7 MB/s instead of 30 MB/s, noticed your remark about units).

edenhill commented 4 years ago

It is primarily the number of CGo C calls that brings the slow down, which is per message, so do you have message rate numbers?

lexasoft123 commented 4 years ago

You can see it in an example of rdkafka consumer above:

% 979435 messages (30752934 bytes) consumed in 1000ms: 979434 msgs/s (30.75 MB/s)
% 1964445 messages (61645872 bytes) consumed in 2000ms: 982222 msgs/s (30.82 MB/s)
% 2935212 messages (92166382 bytes) consumed in 3000ms: 978403 msgs/s (30.72 MB/s)
% 3910789 messages (122813417 bytes) consumed in 4000ms: 977696 msgs/s (30.70 MB/s)
% 4862988 messages (152711904 bytes) consumed in 5000ms: 972596 msgs/s (30.54 MB/s)
% 5832513 messages (183155065 bytes) consumed in 6000ms: 972084 msgs/s (30.53 MB/s)
% 6812336 messages (213931535 bytes) consumed in 7002ms: 972793 msgs/s (30.55 MB/s)
% 7790386 messages (244593041 bytes) consumed in 8002ms: 973450 msgs/s (30.56 MB/s)
% 8769698 messages (275324477 bytes) consumed in 9002ms: 974101 msgs/s (30.58 MB/s)

The real producing speed is much smaller but there is a huge backlog (tens of billions of messages) in the topic which the software has to process.

edenhill commented 4 years ago

Okay, so Go is doing about ~220k msgs/s.

Try disabling CGo's runtime checks by setting export GODEBUG=cgocheck=0 in your shell that runs the consumer. Any difference in throughput?

lexasoft123 commented 4 years ago

@edenhill no throughput difference with that option. If you want I can privately share with you some test data to try on.

edenhill commented 4 years ago

IIRC 220k msgs/s roughly matches the throughput we saw in earlier Go client throughput tests, with the main culprit being the CGo calling overhead which we've done our best to cut down to a minimum.

I recommend experimenting with multiple consumers to share the load.

lexasoft123 commented 4 years ago

Unfortunately it is not possible cause the data must be processed consequently... I see a 2-3 times throughput increase when I add the following to consumer options:

        "fetch.message.max.bytes": 100000000,
        "queued.min.messages":     10000000,

In this case consuming speed jumps from 0 to 50 MB/s but the processing is more fast.