criteo / kafka-sharp

A C# Kafka driver
Apache License 2.0
110 stars 44 forks source link

About the performance of clusterclient.consume () in Kafka 0.8.2.1 #50

Closed JoosonP closed 4 years ago

JoosonP commented 4 years ago

The kafka version I connected to was 2.9.1-0.8.2.1.
Here's a snippet of code foreach (var topic in topics) { var partitionIds = clusterClient.GetPartitionforTopicAsync(topic).ConfigureAwait(false).GetAwaiter().GetResult();

            var dicPartitionOffsets = dicTopicPartitionOffsets.ContainsKey(topic) ? dicTopicPartitionOffsets[topic] : new Dictionary<int, long>();

            foreach (var partitionId in partitionIds)
            {
                var latestOffset = clusterClient.GetLatestOffset(topic, partitionId).ConfigureAwait(false).GetAwaiter().GetResult();
                long currentOffset = 0;
                if (dicPartitionOffsets.ContainsKey(partitionId))
                {
                    if (dicPartitionOffsets[partitionId] >= latestOffset)
                        currentOffset = latestOffset;
                    else
                        currentOffset = dicPartitionOffsets[partitionId] + 1;
                }
                else
                {
                    currentOffset = latestOffset;
                }

                clusterClient.Consume(topic, partitionId, currentOffset);
            }
        }

I prepared 50,400 pieces of data. When I executed this procedure, I printed offset in the console.It took about 23 minutes to print from 1 to 50399, which doesn't seem very efficient. By the way, the configuration information is basically the default, and there is only one topic and one partition. How can I improve my performance with a goal of 20000 per minute.

(Are there any official performance tests for reference?)

ychuzevi commented 4 years ago

Hi, From what I understand, your code is fetching synchronously the message one by one so it is expected to be rather long. Instead you can try a simple clusterClient.ConsumeFromLatest() and use the callback to simply increment a counter (event Action<KafkaRecord<TKey, TValue>> MessageReceived;). The consumer is rather simple (a lot more effort went into the producer) but even so we have some production instances consuming around 6K msg/seconds. (so ~400K per minute). You should be able to go even higher. The key point is not to perform any costly treatment in the callback as the consumer will wait for the callback to return before fetching the next batch.