Open anhnongdan opened 6 years ago
Kernel die frequently due to failed jobs. Memory Limit exceeded.
To do the analysis effectively, need to break down the function and see how much time each step takes.
1.Aggregate IMEI usage gr.(pn, imei).agg(imei_usage_fts)
4 tasks failed for imei_usage.repartition(100).cache() Memory exc: 4.5/4.5GB
1 count job for imei_usage goes through 3 cache stages:
The second cache stage has way too many failed task Then, the last cache run very smooth
Total: imei usage time: 0:14:31.579744
count: 32,768,611
And I don't think that the DAG is this complex:
Rerun this code make runtime longer, It means something build up in the system and weight it down.
imei usage time: 0:17:25.769628
Physical Plan:
== Physical Plan ==
InMemoryColumnarTableScan [phone_number#226L,sub_imei#219,call_duration#227,num_calls#229L,max_call_time#230,min_call_time#231,num_contacts#232L],
InMemoryRelation [phone_number#226L,sub_imei#219,call_duration#227,num_calls#229L,max_call_time#230,min_call_time#231,num_contacts#232L],
true,
10000,
StorageLevel(true, true, false, true, 1),
TungstenExchange RoundRobinPartitioning(100),
None,
None
The code doesn't run much faster without repartitioning.
imei usage time: 0:14:08.893162
== Physical Plan ==
InMemoryColumnarTableScan [phone_number#400L,sub_imei#393,call_duration#401,num_calls#403L,max_call_time#404,min_call_time#405,num_contacts#406L],
InMemoryRelation [phone_number#400L,sub_imei#393,call_duration#401,num_calls#403L,max_call_time#404,min_call_time#405,num_contacts#406L],
true,
10000,
StorageLevel(true, true, false, true, 1),
ConvertToUnsafe,
None
Pretty much the same failed tasks.
Rerun without repartitioning second time and the Job itself failed!!!
2.Aggregate gr.(pn) get num_imei and agg total_pn_fts
Proceed straight to agg imei even run faster.
imei aggregating time: 0:13:10.473043
But still, 28 stages failed due to Memory Limit!!
PYSPARK_SUBMIT_ARGS_CUSTOM="--master yarn-client --driver-cores 3 --driver-memory 3g \
--executor-cores 3 --executor-memory 6g \
--conf spark.rpc.message.maxSize=1024 \
--conf spark.kryoserializer.buffer.max=1g \
Rerun above with imei_usage.repartition(70) take the same time:
imei aggregating time: 0:13:00.968838
But only 9 task fail this time:
But see how the DAG is branched, it seems that some work is duplicated:
There're 2 rdd is cached as shown in UI, one is 98MB and the other 115MB, these sizes are small, so, still don't know why Memory Limit would be that big problem.
RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk
Project [phone_number#12L,num_imei#362L,imei_list#365,top_imei#374,top_call_duration#376,top_num_calls#375L,top_max_call_time#379,top_min_call_time#380,top_num_contacts#381L,top_num_calls_pct#377,...
Memory Deserialized 1x Replicated 200 100% 98.6 MB 0.0 B 0.0 B Memory Deserialized 1x Replicated 200 100% 98.6 MB 0.0 B 0.0 B
Project [phone_number#12L,num_imei#100L,sum_num_calls#101L,sum_call_duration#102,imei_list#103,sub_imei#5,call_duration#13,num_calls#15L,max_call_time#16,min_call_time#17,num_contacts#18L] +- SortMergeJoin [phone_number#12L], [phone_number#110L] :- Project [phone_number#12L,num_imei#100L,sum_num_calls#101L,sum_call_duration#102,imei_list#103] : +- ...
RoundRobinPartitioning(70), None, None Memory Deserialized 1x Replicated 200 100% 115.2 MB 0.0 B 0.0 B
Memory Limit exceed still happened:
ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
--
3.Get pn with num_imei > 1, rank top/latest, and join top/latest
Then proceeding this only takes 22 seconds
imei aggregating time: 0:00:22.851197
It seems that all calculation steps before multi_imei_usage
is skipped
The last step of the process: clone columns for single_imei and union and writing file takes too long:
time total: 0:16:45.020129
Then, trying to cache subs
and avoid counting subs_single_imei
:
time total: 0:01:57.315595
Back to storage problem I unpersist all df but there're 2 small df still in Executors' memory. Other 2 big one (over 2.5GB) are really freed.
subs_multi_imei_ready.unpersist() #>> This leaked ~98MB
multi_imei_usage.unpersist() #>> This leaked ~110MB
I need some systematic method to prevent Memory Leak like this.
I eliminate all cache() at intermediate df (single_imei_usage, multi_imei_usage, etc..). First, parallel computing happened.
=> But actually total run time is longer:
time total: 0:33:35.782078
The reason is that I didn't count after cache()
And finally add count after cache():
time total: 0:12:09.626836
(I think this match the performance of production code of weekly_graph calculation when compute for 1 day only)
https://10.8.0.1:8806/notebooks/thangnguyen/04_Raw_Features/IMEI_vs_TAC.ipynb
The calculation is pretty heavy when calculating on full population. Need to extract people with multiple IMEIs used a day and rank over [num_calls, duration] and [call min time] to get top IMEI and latest IMEI.
On VT tc_histories of 1 day is around 400M records.