apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.3k stars 1.19k forks source link

target_partitions execution option is ignored when the input has 1 partition #12611

Closed palaska closed 1 month ago

palaska commented 1 month ago

Describe the bug

target_partitions execution option is ignored when the input has 1 partition. The introduced condition here is the root cause.

To Reproduce

use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::prelude::col;
use datafusion::functions_aggregate::sum::sum;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::test_util::scan_empty;

let config = SessionConfig::new().with_target_partitions(4);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();

let schema = Schema::new(vec![
    Field::new("a", DataType::Utf8, false),
    Field::new("b", DataType::UInt64, false),
]);

let logical_plan = scan_empty(None, &schema, Some(vec![0, 1]))
    .unwrap()
    .aggregate(vec![col("a")], vec![sum(col("b"))])
    .unwrap()
    .build()
    .unwrap();

let optimized_plan = session_state.optimize(&logical_plan).unwrap();

let plan = session_state
    .create_physical_plan(&optimized_plan)
    .await
    .unwrap();

println!(
    "{}",
    DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);

Above code produces a physical plan with a single output partition and the plan doesn't contain a RepartitionExec.

use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::prelude::col;
use datafusion::functions_aggregate::sum::sum;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::test_util::scan_empty_with_partitions;

let config = SessionConfig::new().with_target_partitions(4);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();

let schema = Schema::new(vec![
    Field::new("a", DataType::Utf8, false),
    Field::new("b", DataType::UInt64, false),
]);

// setting the number of input partitions to 2 instead of 1
let logical_plan = scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), 2)
    .unwrap()
    .aggregate(vec![col("a")], vec![sum(col("b"))])
    .unwrap()
    .build()
    .unwrap();

let optimized_plan = session_state.optimize(&logical_plan).unwrap();

let plan = session_state
    .create_physical_plan(&optimized_plan)
    .await
    .unwrap();

println!(
    "{}",
    DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);

When the input partition number is set to a value > 1, it works as expected. (multi_partitions becomes true here)

Expected behavior

No response

Additional context

No response

akurmustafa commented 1 month ago

In datafusion, target_partition argument doesn't necessarily increase partition count each time. If DataFusion thinks that executing the query in single partition is better in terms of performance, it will do so even if target_partition number is larger than 1. Do you think, parallelism will improve the performance for this query, if you think so we should definitely increase partition for this query. What is your thoughts in this regard?

In short, setting target_partitions to larger than 1 doesn't necessarily increase partition in datafusion.

palaska commented 1 month ago

In datafusion, target_partition argument doesn't necessarily increase partition count each time. If DataFusion thinks that executing the query in single partition is better in terms of performance, it will do so even if target_partition number is larger than 1. Do you think, parallelism will improve the performance for this query, if you think so we should definitely increase partition for this query. What is your thoughts in this regard?

In short, setting target_partitions to larger than 1 doesn't necessarily increase partition in datafusion.

Thanks for the explanation! I agree that optimizing for performance makes sense, as long as it doesn't compromise guarantees or hurt system predictability. In Ballista, a "task" is generated for each partition, and changing this behavior has caused some unit test assertions to fail. However, I don't think this is a major issue for Ballista. I’m not familiar with how this flag is being used in other systems, but @alamb might have some insights to share.

alamb commented 1 month ago

I think the reason it is called "target_partitions" is that is is not a guarantee but instead is a target used for performance optimizations as @akurmustafa mentions

If you need more than one partition you can always modify the plan / set the required input partitions

palaska commented 1 month ago

Thanks for the clarifications, closing this one.