apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.22k stars 1.17k forks source link

Writing to CSV create multiple partition file but other files are empty except partition Zero #5467

Open MrDataPsycho opened 1 year ago

MrDataPsycho commented 1 year ago

Describe the bug Hi, I was trying a data processing pipeline. Here is the full code of the pipeline. While writing the result into CSV by default it creates several csv partition files but all the data are in the first file and other files are empty. How can I control the partition behavior?

Here are the file created by the program:

⇒ ls data/processed/diabetes 
part-0.csv part-1.csv part-2.csv part-3.csv part-4.csv part-5.csv part-6.csv part-7.csv

To Reproduce Here is the full code of the processing:

use datafusion::error::Result;
use datafusion::prelude::*;
use log::info;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use std::fs;
use env_logger;

#[tokio::main]
async fn main() -> Result<()>{
    env_logger::init();
    let df = read_diabetes_dataset().await;
    let df = filter_zero_values(df).await;
    df.clone().limit(0, Some(5))?.show().await?;
    run_pipeline(df).await?;
    Ok(())
}

async fn read_diabetes_dataset() -> DataFrame{
    let path = "data/raw/diabetes.csv";
    let schema = Schema::new(vec![
        Field::new("Pregnancies", DataType::Int32, false),
        Field::new("Glucose", DataType::Int32, false),
        Field::new("BloodPressure", DataType::Int32, false),
        Field::new("SkinThickness", DataType::Int32, false),
        Field::new("Insulin", DataType::Int32, false),
        Field::new("BMI", DataType::Float32, false),
        Field::new("DiabetesPedigreeFunction", DataType::Float32, false),
        Field::new("Age", DataType::Int32, false),
        Field::new("Outcome", DataType::Int8, false),
    ]);
    let csv_read_optoins = CsvReadOptions{schema: Some(&schema), ..Default::default()};
    let ctx = SessionContext::new();
    let df = ctx.read_csv(path.to_string(), csv_read_optoins).await.unwrap();
    info!("File read successfully into {}", path);
    df
}

async fn filter_zero_values(df: DataFrame) -> DataFrame {
    let df = df
        .filter(col("Glucose").gt(lit(0))).unwrap()
        .filter(col("BloodPressure").gt(lit(0))).unwrap()
        .filter(col("SkinThickness").gt(lit(0))).unwrap()
        .filter(col("Insulin").gt(lit(0))).unwrap()
        .filter(col("BMI").gt(lit(0.0))).unwrap()
        .filter(col("Age").gt(lit(0))).unwrap()
        ;
    df
}

async fn run_pipeline(df: DataFrame) -> Result<()> {
    let write_path = "data/processed/diabetes";
    match fs::remove_dir_all(write_path) {
        Ok(_) => info!("File already existed while writting and removed!"),
        Err(_) => info!("No file found while cleaning the write path!")
    };
    info!("Row count before processing.");
    let result = df.clone().aggregate(vec![], vec![count(col("Pregnancies"))])?;
    result.show_limit(1).await?;
    let df = filter_zero_values(df).await;
    let result = df.clone().aggregate(vec![], vec![count(col("Pregnancies"))])?;
    info!("Row count after processing.");
    result.show_limit(1).await?;
    df.write_csv(write_path).await?;
    info!("File written successfully into {}", write_path);
    Ok(())
}

I did check the repartitioning API doc. But there is no specific example there. The data can be download from Kaggle diabetes data set.

Dep. list:

[dependencies]
tokio = "1.26.0"
datafusion = "19.0.0"
log = "0.4.0"
env_logger = "0.9.0"

Expected behavior The write should create only one partition as long I do not say it to create more partitions or I do not re-partition the data frame.

Jefffrey commented 1 year ago

I think the following should be able to coalesce into a single partition:

df.repartition(Partitioning::RoundRobinBatch(1))?

As for the CSV writing empty files, see #5383 for some discussion around that