apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.39k stars 181 forks source link

Submitting same DF to ballista is slow #714

Open luckylsk34 opened 1 year ago

luckylsk34 commented 1 year ago

Describe the bug In Ballista, when I submit the same DF for execution, it takes about .5 Seconds for each computation and 5 Seconds for 10 iterations. This happens in cluster mode with 1 executor mode also. The random.parquet contains a single column with 1 million random integers populated. When I did the same with just DataFusion, It is very fast. This is not the case with Spark. Am I doing something wrong.

To Reproduce Run the below code:

    // For doing the same with DataFusion
    // let ctx = SessionContext::new();

    // Ballista
    let config = BallistaConfig::builder().build().expect("");

    // connect to Ballista scheduler
    // let ctx = BallistaContext::remote("localhost", 50050, &config).await.expect("");
    let ctx = BallistaContext::standalone(&config, 4).await.expect("");

    let df = ctx.read_parquet("./testdata/random.parquet", ParquetReadOptions::default()).await?;
    let args: Vec<String> = env::args().collect();
    let n = args[1].parse::<i32>().expect("");
    let start = SystemTime::now();
    for _ in 0..n {
        df.clone().aggregate(vec![], vec![sum(col("random_integers"))])?.collect().await?.get(0);
    }
    let end = SystemTime::now();
    println!("{:?}", end.duration_since(start).expect("").as_millis());

    // println!("{}", df.count().await?);
    // df.show_limit(10).await?;
    Ok(())
val df = spark.read.parquet("./testdata/random.parquet")
def run_loop(n: Int) = {
    val t0 = System.nanoTime()
    for (a <- 1 to n) {
        df.agg(sum("random_integers")).first
    }
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0) / 1000000 + "ms")
}

Expected behavior Don't take time between 2 calls.

Additional context

zhzy0077 commented 1 year ago

Interestingly, there are quite a few cases where Ballista polls results from executor and the poll timeout is fixed 100ms. To compare, I ran the same code and see 250ms on out of box code, and see 160ms per loop after applying this patch:

--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -259,7 +259,7 @@ async fn execute_query(
             .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
             .into_inner();
         let status = status.and_then(|s| s.status);
-        let wait_future = tokio::time::sleep(Duration::from_millis(100));
+        let wait_future = tokio::time::sleep(Duration::from_millis(1));
         let has_status_change = prev_status != status;
         match status {
             None => {

And further to around 60ms per loop with this patch:

--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -127,7 +127,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         }

         if !active_job {
-            tokio::time::sleep(Duration::from_millis(100)).await;
+            tokio::time::sleep(Duration::from_millis(1)).await;
         }
     }
 }

Maybe Ballista should support configuring this wait period?