Implement stage-level resourceProfile auto-adjust framework to avoid oom
Backgroud
In our production environment, we suffer a lot from gluten jobs throwing heap OOM exception occasionally.
We hava digged into these problem, and there are major two kinds problem causing our jobs throwing oom:
Stage contains fallback operator, eg: udaf and other still not supported function or operator, which require more heap memory then configured.
Stage contains no fallback operator but contains a very heavy upstream exchanage. Here heavy means the upstream exchenage contains a huge M N shuffle status(M means the shuffle mapper num and N means the reducer num), when this stage begins to do shuffle read, the executor side must keep the whole mapStatuses of the upstream shuffle status, when M N is large, it's very likely causing heap OOM exception.
The root cause is for now in a same spark application, all stages share same task heap/offheap memory config, and when different stage requires different offheap/heap fraction, the problem appears. Since https://github.com/apache/incubator-gluten/issues/4392 has proposed a potential solution to solve this type of problem, we did some verification based on this idea.
Design
Introduce ResourceProfile setter in WholeStageTransformerand ColumnarShuffleExchangeExec
Since all underlying native computation gets triggered from WholeStageTransformer or from ColumnarShuffle, we can add
@transient private var resourceProfile: Option[ResourceProfile] = None
def withResources(rp: ResourceProfile): Unit = {
this.resourceProfile = Some(rp)
}
in WholeStageTransformer, and when doCxecuteColumnar get Called and before rdd returned, set the resourceProfile for rdd.
if (resourceProfile.isDefined) {
logInfo(s"set resource profile ${resourceProfile.get} for child $child")
rdd.withResources(resourceProfile.get)
}
rdd
Introduce GlutenDynamicAdjustStageRP in HeuristicApplier
when aqe is enabled, we can check all operator in this stage and collect all child queryStage if exist belong to this stage.
After we have collected all plan nodes belong to this stage, we can know whether there exists fallback or not, also we can calculate the shuffle status complexity to roughly estimate mapStatus memory occupation. The rule works in follwing steps
1. Collect all plan nodes belong to this stage.
Analyze plan nodes detail, gathing whether fallback exists and whether exist child queryStage.
Generate new resource profile
3.1 Get the default resource profile from the sparkContext.resourceProfileManager and initializes task and executor resource requests based on the default profile.
3.2 Adjusting Memory/Offheap Request
Handle Different Scenarios for Resource Profile Adjustment
Scenario 1: Fallback Exists: If both existsC2RorR2C and existsGlutenOperator are true, tries to apply the new resource profile to the detailed plans.
Scenario 2: Shuffle Status Consideration: It filters the detailed plans to get only the ShuffleQueryStageExec instances. If there are any, it calculates the complexity of the stage shuffle status based on the number of mappers and reducers in each ShuffleQueryStageExec. If the calculated complexity meets or exceeds a threshold from the glutenConfig, then applies the new resource profile to the detailed plans.
Apply new resource profile if needed
We have completed a poc of this design and really sovled these two types oom problem, and we are refactoring code and plan to contribute to community.
Requirements
Aqe must be enabeld
Meets the stage level resource conditions
executor dynamic allocation is enabled, spark.dynamicAllocation.enabled must be true
Underlying resource schduler must support dynamic allocate executor
Potential Other Benifit
Provided a new way to specify other resources eg. gpu for stage
External tuning systems can intervene through this way.
FAQ
what if a stage exists multiple WholeStageTransformer and will the multiple resource profile conflict each other?
Multiple resource profile can be merged through spark's mechnism.
What if one stage get totally fallback which means there no chance to set ResourceProfile for this stage?
Potential solution: a) Wrap the whole fallbacked plan with a WrapperNode with interface and abillity to set ResourceProfile; b) Set default resource profile suitable for whole-stage-fallback stage and no need to set plan for this stage.
other question?
We‘d love to here more thoughts and receive more comments about this idea!
Implement stage-level resourceProfile auto-adjust framework to avoid oom
Backgroud
In our production environment, we suffer a lot from gluten jobs throwing heap OOM exception occasionally.
We hava digged into these problem, and there are major two kinds problem causing our jobs throwing oom:
heavy
means the upstream exchenage contains a huge M N shuffle status(M means the shuffle mapper num and N means the reducer num), when this stage begins to do shuffle read, the executor side must keep the wholemapStatuses
of the upstream shuffle status, when M N is large, it's very likely causing heap OOM exception.The root cause is for now in a same spark application, all stages share same task heap/offheap memory config, and when different stage requires different offheap/heap fraction, the problem appears. Since https://github.com/apache/incubator-gluten/issues/4392 has proposed a potential solution to solve this type of problem, we did some verification based on this idea.
Design
WholeStageTransformer
andColumnarShuffleExchangeExec
Since all underlying native computation gets triggered from WholeStageTransformer or from ColumnarShuffle, we can add
in WholeStageTransformer, and when doCxecuteColumnar get Called and before rdd returned, set the resourceProfile for rdd.
GlutenDynamicAdjustStageRP
inHeuristicApplier
when aqe is enabled, we can check all operator in this stage and collect all child queryStage if exist belong to this stage.
After we have collected all plan nodes belong to this stage, we can know whether there exists fallback or not, also we can calculate the shuffle status complexity to roughly estimate mapStatus memory occupation. The rule works in follwing steps
Analyze plan nodes detail, gathing whether fallback exists and whether exist child queryStage.
Generate new resource profile 3.1 Get the default resource profile from the sparkContext.resourceProfileManager and initializes task and executor resource requests based on the default profile. 3.2 Adjusting Memory/Offheap Request
Handle Different Scenarios for Resource Profile Adjustment
Scenario 1: Fallback Exists: If both existsC2RorR2C and existsGlutenOperator are true, tries to apply the new resource profile to the detailed plans.
Scenario 2: Shuffle Status Consideration: It filters the detailed plans to get only the ShuffleQueryStageExec instances. If there are any, it calculates the complexity of the stage shuffle status based on the number of mappers and reducers in each ShuffleQueryStageExec. If the calculated complexity meets or exceeds a threshold from the glutenConfig, then applies the new resource profile to the detailed plans.
We have completed a poc of this design and really sovled these two types oom problem, and we are refactoring code and plan to contribute to community.
Requirements
spark.dynamicAllocation.enabled
must be truePotential Other Benifit
FAQ
Multiple resource profile can be merged through spark's mechnism.
What if one stage get totally fallback which means there no chance to set ResourceProfile for this stage?
Potential solution: a) Wrap the whole fallbacked plan with a WrapperNode with interface and abillity to set ResourceProfile; b) Set default resource profile suitable for whole-stage-fallback stage and no need to set plan for this stage.
other question?
We‘d love to here more thoughts and receive more comments about this idea!