dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

High CPU usage #14

Closed jedisct1 closed 9 years ago

jedisct1 commented 9 years ago

Hi Dibyendu,

While the low-level Kafka consumer works fine, it also seems to require a suspiciously high amount of CPU cycles, even on a beefy, bare-metal machine.

With a single consumer reading from a single partition, CPU usage on the machine running the consumer goes up to 25% to read 4 Mo/s.

After bumping kafka.partitions.number to 50, still with a single consumer, and with the same data pushed to a single partition at a 4 Mo/s rate, CPU usage jumps to 100% and never goes down.

Tweaking consumer.fetchsizebytes and consumer.fillfreqms didn't make a difference.

Is the Kafka consumer supposed to be that slow? If this is not the case, how may I help diagnose what is going on?

dibbhatt commented 9 years ago

Hi ,

I will take a look at it what is going on.

Just for your information , the kafka.partitions.number probably seems to be misleading I guess.

This is the value to be set as number of partition for your topic . Even if you do not provide it , the consumer still calculate the partition number . So this value will not make any difference.

How many number of partitions are there in your topic ?

If you want as many receiver as you partition , you can use that many number of Receivers .

Say if your topic have 50 partition have 50 receiver which will distribute the load ..

int numberOfReceivers = 50;

JavaDStream unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY());

Just try this option and let me know if that helps .

jedisct1 commented 9 years ago

Hi,

I will eventually have data on 50 partitions, and more receivers but right now the data are only pushed to a single partition.

It's really not much (4 Mo/s), and I am expecting a single receiver to be able to handle it without requiring that much CPU, as even the Kafka client for PHP is way faster.

Increasing the number of receivers wouldn't help here, as the data is going to a single partition.

In the first setup, I had kafka.partitions.number set to 1. In the second setup, I still push the same data, and only to a single partition, but I had kafka.partitions.number set to 50 as I am planning to eventually use 50 partitions, but with way more data than 4 Mo/s. But right now 49 partitions are inactive so the CPU usage shouldn't go to the roof.

dibbhatt commented 9 years ago

Hi ,

For Receiver based consumer , Spark Receiver task always running on one Executor Core.

For your second setup, you specified kafka.partitions.number as 50 . What is the number of Receivers you specified ? That is also set as 50 or that is still 1 ?

The issue here is even if other 49 partition does not have any data , the Receiver task is a Long Running process which will continue check for Kafka partition every FillFreqMs duration for new data. And this leads to CPU usage.

There can be a workaround when you want to consume from ONLY one partition ..you can directly use below API. Let assume 0 is the Partition_0 you want to consume..

JavaDStream oneStream = jsc.receiverStream ( new KafkaReceiver(props, 0 , StorageLevel.MEMORY_ONLY()));

Or if you want to consume ONLY subset of partition ( ignoring others )..where partitionSet is Set of Integers of partition_id you want to consume using Single Receiver.

JavaDStream myStream = jsc.receiverStream ( new KafkaRangeReceiver(props, partitionSet , StorageLevel.MEMORY_ONLY()));

Once your all Partitions are getting data and when you have enough CPU core to test all your Receivers , you can use the ReceiverLauncher.launch API which eventually internally distribute the partitions to Receivers.

Dibyendu

jedisct1 commented 9 years ago

Hi Dibyendu,

I am using a single receiver in both cases. So in the second case, there is only one receiver for the 0-49 partition range.

Gotcha. Even when partitions don't have any data, they keep checking for data every FillFreqMs. Still, it's quite surprising that 50 receivers checking every 200ms require 100% of the available CPU cycles.

dibbhatt commented 9 years ago

Yes it is surprising. I will try to create similar environment like yours with 50 partitions with one partition active and create one Receiver task. Can you please let me know which spark version you are running ?

Dibyendu

dibbhatt commented 9 years ago

sorry mistakenly press close button

jedisct1 commented 9 years ago

Hi Dibyendu

I am using Spark 1.2.0

dibbhatt commented 9 years ago

Hi ,

Made some changes to fix the High CPU usage while receiving from topics with large partitions. Kindly get the latest code and let me know if that improve your CPU consumption.

regards, Dibyendu

jedisct1 commented 9 years ago

Awesome, thanks! Gonna give it a spin right away.

jedisct1 commented 9 years ago

Hi Dibyendu,

CPU usage remains 100% :(

dibbhatt commented 9 years ago

I hope you took the latest code from github , not from Spark-Packages Repo.

In my setup running on AWS with 4 node Spark 1.2 cluster , I created Kafka topic with 50 partition and receiving with 1 receiver. With the latest fix the CPU usage stays around 8 to 9 % for driver and around 17 % for the worker node where Receiver task is running .

I used the same Consumer code in which is there in github..

https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/client/Consumer.java

Is it possible to share the code you are using .

dibbhatt commented 9 years ago

And also your cluster configuration ..

jedisct1 commented 9 years ago

Hi Dibyendu,

My bad, the changes you made yesterday actually make a huge difference. I had overlooked that you had bumped the version number and my code was still loading the previous version.

This, along with disabling the WAL significantly reduced the CPU usage.

Kudos and thanks for addressing this issue so quickly. You rock.

dibbhatt commented 9 years ago

@jedisct1 glad to hear that it has worked for you .

dibbhatt commented 9 years ago

Hi @jedisct1

I have created a JIRA to track the progress of contributing back this project to Apache Spark.

https://issues.apache.org/jira/browse/SPARK-11045

This project is now presently in spark-packages and I believe this is the correct time to contribute it to Apache Spark Project and give better options to larger community around Kafka Connectivity for Spark Streaming.

kindly Vote for this JIRA.