apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.67k stars 1.06k forks source link

Need to add an unique id to MetricsSet or ExecutionPlanMetricsSet #3033

Open mingmwang opened 1 year ago

mingmwang commented 1 year ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] (This section helps Arrow developers understand the context and why for this feature, in addition to the what)

The is useful for Ballista Scheduler to post merge the metrics of the plan operators from all the Executor task results. Today, this is a missing part in Ballista. The task results returned from the executors do not report back any of the plan node execution metrics to Scheduler. After adding the report back feature, the metrics need to be correctly merged, need a way to uniquely identify which metrics belong to which plan operator and only the metrics belong to the same plan node can be merged in the Scheduler side.

@alamb Please share your thoughts and advice.

Describe the solution you'd like A clear and concise description of what you want to happen.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

alamb commented 1 year ago

As I understand it, you are trying to identify all metrics for a particular query execution within a session

The session_id on SessionState already identifies the session, but a single session can run multiple queries

What about adding a statement_id to the SessionState? Like:

/// Execution context for registering data sources and executing queries
#[derive(Clone)]
pub struct SessionState {
    /// Uuid for the session
    pub session_id: String,
    /// Statement id -- each query / statement increases this number. <----- this is NEW
    pub statement_id: usize,
    /// Responsible for optimizing a logical plan
    pub optimizer: Optimizer,
   ... all the same ...
}

Then you could use the combination of (session_id, statement_id) to connect up metrics across nodes?

mingmwang commented 1 year ago

@alamb A single session could have multiple statements run concurrently. And a statement_id is not enough to differ metrics either. For example a single query statement might contain multiple join nodes or multiple parquet scan nodes, each operator node has its own metric set, the node is running by multiple executor instances with many tasks. When Scheduler combine the metrics, need a way to correctly identify metrics from the exact same operator can be combined and merged.

mingmwang commented 1 year ago

If we do not have unique ids to represent the MetricsSet, we have to use some other implicit ways to correlate the metrics. One solution is to leverage the array index. For example after the Executor finish a task, traverse the plan tree and collect all the MetricsSets into an Array of MetricsSet. The order of the MetricSet in the Array matters. in the Scheduler side, multiple tasks report back the MetricsSets, MetricsSets will be merged base on the array index. This solution indicates that for a plan stage, the MetricsSet array returned from all the task are with the same length.

This is true in current DataFusion code base because the real physical plan is the same for all the tasks. But in future if we want to apply some kind of micro adaptive execution optimization, the real physical plan might be a little different and then the MetricsSet array length might not be the same. Another case is in Executor side, we might apply some kind of optimization like if the MetricsSet are all trivial zeros, the Executor might not report back the trivial MetricsSet to reduce the TaskResult size and save the metric combine cost in Scheduler.

@thinkharderdev @yahoNanJing

mingmwang commented 1 year ago

https://github.com/apache/arrow-ballista/issues/116

alamb commented 1 year ago

A single session could have multiple statements run concurrently. And a statement_id is not enough to differ metrics either.

That is a good point. I still think statement_id is likely necessary -- sounds like it would also be prudent to add a execution_plan_id or operator_id to ExecutionPlan or LogicalPlan as well to properly identify the execution? I can't remember if Ballista sends dyn ExecutionPlans or LogicalPlans to the executors.

One solution is to leverage the array index. For example after the Executor finish a task, traverse the plan tree and collect all the MetricsSets into an Array of MetricsSet.

Yeah, as you noted above, one potential issue with this approach is that it will only work if the exact same ExecutionPlan is created on each node.

thinkharderdev commented 1 year ago

A single session could have multiple statements run concurrently. And a statement_id is not enough to differ metrics either.

That is a good point. I still think statement_id is likely necessary -- sounds like it would also be prudent to add a execution_plan_id or operator_id to ExecutionPlan or LogicalPlan as well to properly identify the execution? I can't remember if Ballista sends dyn ExecutionPlans or LogicalPlans to the executors.

On Ballista we send the ExecutionPlan and the ShuffleWriteExec should already have a job_id attached.

mingmwang commented 1 year ago

job_id

Yes, the job_id in Ballista is similar to statement_id. In current Ballista implementation, each SQL statement will just trigger one job.