confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
2.78k stars 847 forks source link

High CPU Utilisation #468

Open Sourabh721992 opened 6 years ago

Sourabh721992 commented 6 years ago

I am using version 0.11.3 and have made four threads for each consumer (4 consumers are there) that are subscribing to the same Topic and same group (so 1 Consumer group).

I have witness random CPU consumption and have random numbers like 0, 3, 19, 64 etc and most of the time the consumption is in double figures.

PFB the code and request you to suggest the fix.

using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using Confluent.Kafka.Serialization;

namespace AxesKafkaConsumer { class Program { static dynamic config; static void Main(string[] args) {

region KafkaConsumerConfiguration

        config = new Dictionary<string, object>
        {
            { "bootstrap.servers", ConfigurationManager.AppSettings["KakfaServer"].ToString() },
            { "group.id", ConfigurationManager.AppSettings["kafkaConsumerGrp"].ToString() },
            { "default.topic.config", new Dictionary<string, object>
                {
                    { "auto.offset.reset", "smallest" }
                }
            }
        };
        #endregion

        #region KafkaConsumers

        Thread thKakfaConsumer1 = new Thread(new ThreadStart(Consumer1));
        thKakfaConsumer1.Start();

        Thread thKakfaConsumer2 = new Thread(new ThreadStart(Consumer2));
        thKakfaConsumer2.Start();

        Thread thKakfaConsumer3 = new Thread(new ThreadStart(Consumer3));
        thKakfaConsumer3.Start();

        Thread thKakfaConsumer4 = new Thread(new ThreadStart(Consumer4));
        thKakfaConsumer4.Start();

        #endregion
    }

    static void Consumer1()
    {
        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.OnMessage += (_, msg)
                =>
            {
                Console.WriteLine($"Message value: {msg.Value}");
            };

            consumer.Subscribe(ConfigurationManager.AppSettings["kafkaTopic"].ToString());

            consumer.OnPartitionEOF += (_, end)
            => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}.");

            while (true)
            {
                 consumer.Poll(TimeSpan.FromSeconds(1))
            }
        }
    }

    static void Consumer2()
    {
        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.OnMessage += (_, msg)
                =>
            {
                Console.WriteLine($"Message value: {msg.Value}");
            };

            consumer.Subscribe(ConfigurationManager.AppSettings["kafkaTopic"].ToString());

            while (true)
            {
                consumer.Poll(TimeSpan.FromSeconds(1))
            }
        }
    }

    static void Consumer3()
    {
        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.OnMessage += (_, msg)
                =>
            {
                Console.WriteLine($"Message value: {msg.Value}");
            };

            consumer.Subscribe(ConfigurationManager.AppSettings["kafkaTopic"].ToString());

            while (true)
            {
                consumer.Poll(TimeSpan.FromSeconds(1))
            }
        }
    }

    static void Consumer4()
    {
        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.OnMessage += (_, msg)
                =>
            {
                Console.WriteLine($"Message value: {msg.Value}");
            };

            consumer.Subscribe(ConfigurationManager.AppSettings["kafkaTopic"].ToString());

            while (true)
            {
                 consumer.Poll(TimeSpan.FromSeconds(1))
            }
        }
    }
}

}

How to reproduce

Checklist

Please provide the following information:

mhowlett commented 6 years ago

the code looks fine... what throughput are we talking about?

what platform (windows or linux?). if windows, it would be interesting to know if you are able to run the same program on linux (dotnet core) with less CPU?

Finally, you could try this CI build of Confluent.Kafka that uses a more recent build of librdkafka: https://www.nuget.org/packages/Confluent.Kafka/0.11.3-ci-303

Sourabh721992 commented 6 years ago

Hi,

The CPU percentage should be close to 0 -1 percent.

I am running the same on the Windows machine and Sorry, didn't have Linux platform.

I have tried with the 0.11.2-ci-303 but in vain. Still the consumption is in random double digits like 12, 64, 19, 27, 43.

R4cOOn commented 6 years ago

Hi,

We have the same high CPU utilisation. We host our Kafka consumers in a local Service Fabric cluster in full .NET.

One of the clusters works fine and has the expected 1-2% CPU usage when going in the loop. The Kafka cluster is not secure.

The other clusters have the same hardware and show a 30-40% CPU spike when looping (same code deployed). The Kafka clusters are secure though. Running the same code on my development Service Fabric cluster communicating with the secure Kafka cluster shows the expected 1-2% blip when going into the loop.

From time to time the services drive the Service Fabric cluster to 100% CPU. When that happens the cluster can't respond anymore and we have to restart the Kafka consumer services.

I'm not sure how to move forward and isolate the issue.

Cheers.

edenhill commented 6 years ago

It is hard to give any specific advice on this, try to locate what part of the runtime is consuming the CPU by running a CPU profiler, or configure the client with something like "debug", "broker,topic,msg"

mhowlett commented 5 years ago

unfortunately, it looks like #1870 referenced above didn't make it into 0.11.5. The next step is to test with a custom build when that gets merged to master.