Closed MLNW closed 6 years ago
It seems like the problem is that my receivers cache so much data before it can be processed that they run out of memory. I increased their memory and it works for now.
This does not seem to be a good way to go forward since only my receivers need so much memory. The rest of my executors run on like 40KB while my receivers use up to 700MB of their cache.
Is there a way to influence this?
Another weird thing is that when there is an exception my application just dies. YARN should re start the application at least one more time to retry. Is this something within the consumer or who shuts down the application after only one try?
Let me try to explain how this Receiver works and how it manages the memory . Let me try to explain with an example.
Let assume you have 20 Kafka Partitions . And you started ReceiverLauncher with 10 Receiver Tasks. In this case each Receiver Task will pull messages from 2 Kafka Partitions ( using 2 Threads) .
let say you started your job with 10 Spark Executors ( each with some capacity of Cores and Memory). In this case Spark will distribute 10 Receiver Tasks to 10 Executors.
Each Executors Heap is divided into Storage and Execution Memory ( https://0x0fff.com/spark-memory-management/) .
So each of the Receiver task ( pulling from 2 Kafka Partitions ) will use Spark Executor's storage memory fraction of the heap to store the Received Blocks.
Given this, let me explain what all different things can happen.
As Receiver pulls from Kafka , and writes the Block to Spark Block Manager , the Receiver controls it in two way .
fill_frequency = how frequently each receiver will pull messages from Kafka Partitions fetch_size = how large the chunk it will fetch from kafka during every pull
So in above example, let say your fill_frequency is 1 Sec, and fetch_size if 2 MB. So every Receiver Task ( pulling from 2 Kafka Partitions using two Threads) , will pull total 4 MB/ Sec.
Now if your Spark Streaming Batch Duration is say 60 seconds , then for a given batch each Receiver Task will pull ~ 4 x 60 = 240 MB of data from kafka. Hence your Executor's storage memory fraction should be higher than this number .
"Could Not Compute Split" error comes, when Receiver pulls the block from Kafka and stores them in Spark Block Manager, but your Job not processing the block as fast ( let say your batch processing is taking 2 mins) , hence 1 more batch will be queued up in memory, hence it needs more memory.
So Spark has a way to handle this Back Pressure by LRU eviction of blocks from memory. But when Spark does that, you see this "Could Not Compute Split" error if blocks are not evicted to Disk or not replicated.
There are couple of ways to solve this.
First , give enough memory to Executors so that it can hold Batch worth of data. Also use STORAGE_LEVEL = MEMORY_AND_DISK_SER_2 ( it will replicate to other Exeuctor's memory and also use DISK when blocks are LRU evicted )
Second , use the Back Pressure settings in the consumer by setting consumer.backpressure.enabled=true and consumer.min.fetchsizebytes= 1024 ( i.e we are telling Receiver that , Job processing taking longer time , throttle the consume to just 1 KB)
So giving larger Receiver task also helps to reduce memory pressure on executors. I have seen , for N kafka Partitions , having N/2 Receiver is a good settings to starts with. keeping "number of Receiver = number of Executors" will also help to distributed the Receiver task across all Executors.
This is greatly reduced the amount of data written to Spark Block manager. let say your Kafka Payload size ( of a single message is 100KB), but you are just interested on subset of the payload (say subset size is just 10KB) in your Streaming job. You can use this Messagehandler to perform this upfront transformation at receiver side itself, and hence the amount of data written to Spark Block Manager will be much smaller ( 10 times smaller in this case) .
Let me know if this tips will help you. Sorry for a long write-up :)
Hi @MLNW . Let me know if above explanations has helped you to scale your consumer code ?
Thank you for the detailed answer. I'm sure it would help to somewhat scale my consumer code. Problem is that I saw such a big performance degration by using this library that I had to return to doing it myself. I currently average below 4 seconds per batch. Using this API I wasn't even able to get below 1 minute, which is my batch interval.
I am little surprised to hear the performance issue. Whoever switched to this consumer has gained performance over the out of the box Spark consumers. Infact at Instartlogic we did detailed study how much it works better than out of the box library. There are configuration knobs that need to tuned to get best out of it for your use case. If the job is taking longer to compute , there could be various reasons. If you tell me little more about the performance issue I can tell you what all knobs you need to set as default settings certainly not work in most cases.
By looking at your initial message, if you keep Number of Receiver = Number of Executors, that would give you better performance for sure.
Hi @MLNW I am closing this issue . Let me know if you need any further suggestions on how to improve performance and what all other tuning knobs can be tried. Also if you tried some of the options I mentioned, do let me know if things are better for you.
I currently experience the above exception. It seems to happen when receiving data from multiple receivers and unioning said stream of data.
The exceptions always looks like this
```17/11/20 18:52:59 INFO executor.Executor: Finished task 36.0 in stage 131.0 (TID 19339). 872 bytes result sent to driver 17/11/20 18:52:59 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 19341 17/11/20 18:52:59 INFO executor.Executor: Running task 37.0 in stage 131.0 (TID 19341) 17/11/20 18:52:59 ERROR executor.Executor: Exception in task 37.0 in stage 131.0 (TID 19341) java.lang.Exception: Could not compute split, block input-0-1511200296800 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/11/20 18:53:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 19343 17/11/20 18:53:00 INFO executor.Executor: Running task 38.0 in stage 131.0 (TID 19343) 17/11/20 18:53:00 ERROR executor.Executor: Exception in task 38.0 in stage 131.0 (TID 19343) java.lang.Exception: Could not compute split, block input-0-1511200297800 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ```The first time it happened after 45 minutes. The second time it happened after 2 minutes. First run used 4 receivers and 12 executors. Second run used 2 receivers and 8 executors.
Have you seen this problem before?