apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.22k stars 1.18k forks source link

does datafusion master version default enable simd? #1041

Closed jiangzhx closed 1 year ago

jiangzhx commented 3 years ago

Describe the bug query with datafuison with sql

SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat;

enable simd feature or not; the performence has no diffrence; enable command:

cargo +nightly  run --release --features "simd" --example ssb_query_example

with default command:

cargo +nightly  run --release --example ssb_query_example

code:

use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;

use chrono::prelude::*;

use datafusion::prelude::*;

use futures::StreamExt;

#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

#[cfg(feature = "mimalloc")]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let parquet_path = "/Users/sylar/workspace/opensource/ssb-dbgen/output/parquet_10";

    let execution_config = ExecutionConfig::new();
    let mut ctx = ExecutionContext::with_config(execution_config);
    let dt = Local::now();
    ctx.register_parquet("lineorder_flat", parquet_path);
    // let sql = "select count(distinct S_ADDRESS) from lineorder_flat";
    let sql = r#"SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat;"#;
    let df = ctx.sql(sql)?;
    let results: Vec<RecordBatch> = df.collect().await?;
    print_batches(&results)?;
    println!(
        "usage millis: {}",
        Local::now().timestamp_millis() - dt.timestamp_millis()
    );
    Ok(())
}
houqp commented 3 years ago

Hi, we don't enable SIMD by default because it requires nightly toolchain, see https://github.com/apache/arrow-datafusion/blob/ab41d8187d25b24af73d56d03ac166a5498922bc/datafusion/Cargo.toml#L40.

I didn't dig into whether enabling SIMD should result in better performance for you particular query though.

Dandandan commented 3 years ago

For most queries I didn't see an impact in DataFusion using the simd feature.

Dandandan commented 3 years ago

If you are looking to speed up builds these are things that I found do have a bigger effect:

There is a bit of documentation on it over here https://arrow.apache.org/datafusion/user-guide/library.html

houqp commented 3 years ago

For most queries I didn't see an impact in DataFusion using the simd feature.

This is quite interesting, because SIMD is one of the big selling points of Databricks' photon engine. Do you know off the top of head whether this is expected? I would expect it to at least make a difference for aggregations?

Dandandan commented 3 years ago

For most queries I didn't see an impact in DataFusion using the simd feature.

This is quite interesting, because SIMD is one of the big selling points of Databricks' photon engine. Do you know off the top of head whether this is expected? I would expect it to at least make a difference for aggregations?

I am not sure whether it has a large improvement in the Photon engine on queries. I would expect the vectorization (vs row-based) and being implemented in efficiently, modern CPU-aware, written C++ rather than compiled to Java code making the largest impact, rather than the use of SIMD instructions.

jorgecarleitao commented 3 years ago

Rust does compile to SIMD instructions without packed_simd (what the feature offers). I would expect simd feature to offer more benefits in queries containing null values (which afaik our benches do not have).

alamb commented 3 years ago

As @jorgecarleitao and @Dandandan have mentioned, the "simd" feature is somewhat misleadingly named, as it may imply that SIMD instructions (e.g. AVX2, etc) are not used unless that feature is enabled, which is not the case.

Normally the rust compiler will attempt to use SIMD instructions automatically (so called "auto vectorization"), and the suggestion at the link @Dandandan mentions of using RUSTFLAGS='-C target-cpu=native' is probably your best bet to take full advantage of those instructions.

The "simd" feature enables several handwritten arrow kernels that explicitly use SIMD intrinsics rather than relying on the rust compiler to do so.

jiangzhx commented 3 years ago

i did more test, load parquet file to memtable first then query sql;

SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat;

with simd and mimalloc command:

RUSTFLAGS='-C target-cpu=native' cargo +nightly  run --release --features "simd mimalloc" --example ssb_memtable_sum_example

prepare memtable usage millis: 130
sql execution usage millis: 26

with mimalloc command:

RUSTFLAGS='-C target-cpu=native' cargo +nightly  run --release --features "mimalloc" --example ssb_memtable_sum_example

prepare memtable usage millis: 130
sql execution usage millis: 25

data rows: 59,986,052 rows

i guess the reson should be like @jorgecarleitao and @alamb comments; rust already compiler code to use SIMD automatically;

code:

use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;

use chrono::prelude::*;

use datafusion::prelude::*;

use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::logical_plan::Expr;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{common, displayable};
use futures::StreamExt;
use std::sync::Arc;

#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

#[cfg(feature = "mimalloc")]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let parquet_path = "/Users/sylar/workspace/opensource/ssb-dbgen/output/parquet_10";

    let lineorder_flat_schema = Schema::new(vec![
        Field::new("LO_ORDERKEY", DataType::Int64, false),
        Field::new("LO_LINENUMBER", DataType::Int64, false),
        Field::new("LO_CUSTKEY", DataType::Int64, false),
        Field::new("LO_PARTKEY", DataType::Int64, false),
        Field::new("LO_SUPPKEY", DataType::Int64, false),
        Field::new("LO_ORDERDATE", DataType::Int64, false),
        Field::new("LO_ORDERPRIORITY", DataType::Utf8, false),
        Field::new("LO_SHIPPRIOTITY", DataType::Int64, false),
        Field::new("LO_QUANTITY", DataType::Int64, false),
        Field::new("LO_EXTENDEDPRICE", DataType::Int64, false),
        Field::new("LO_ORDTOTALPRICE", DataType::Int64, false),
        Field::new("LO_DISCOUNT", DataType::Int64, false),
        Field::new("LO_REVENUE", DataType::Int64, false),
        Field::new("LO_SUPPLYCOST", DataType::Int64, false),
        Field::new("LO_TAX", DataType::Int64, false),
        Field::new("LO_COMMITDATE", DataType::Int64, false),
        Field::new("LO_SHIPMODE", DataType::Utf8, false),
        Field::new("C_NAME", DataType::Utf8, false),
        Field::new("C_ADDRESS", DataType::Utf8, false),
        Field::new("C_CITY", DataType::Utf8, false),
        Field::new("C_NATION", DataType::Utf8, false),
        Field::new("C_REGION", DataType::Utf8, false),
        Field::new("C_PHONE", DataType::Utf8, false),
        Field::new("C_MKTSEGMENT", DataType::Utf8, false),
        Field::new("S_NAME", DataType::Utf8, false),
        Field::new("S_ADDRESS", DataType::Utf8, false),
        Field::new("S_CITY", DataType::Utf8, false),
        Field::new("S_NATION", DataType::Utf8, false),
        Field::new("S_REGION", DataType::Utf8, false),
        Field::new("S_PHONE", DataType::Utf8, false),
        Field::new("P_NAME", DataType::Utf8, false),
        Field::new("P_MFGR", DataType::Utf8, false),
        Field::new("P_CATEGORY", DataType::Utf8, false),
        Field::new("P_BRAND", DataType::Utf8, false),
        Field::new("P_COLOR", DataType::Utf8, false),
        Field::new("P_TYPE", DataType::Utf8, false),
        Field::new("P_SIZE", DataType::Int64, false),
        Field::new("P_CONTAINER", DataType::Utf8, false),
    ]);

    let project_schema = Schema::new(vec![
        Field::new("LO_EXTENDEDPRICE", DataType::Int64, false),
        Field::new("LO_DISCOUNT", DataType::Int64, false),
    ]);

    let dt = Local::now();

    let table_provider = Arc::new(ParquetTable::try_new_with_schema(
        parquet_path,
        lineorder_flat_schema,
        32,
        false,
    )?);

    let exec = table_provider
        .scan(&Option::Some(vec![9,11]), 8192, &[], None)
        .await?;
    let partition_count = exec.output_partitioning().partition_count();
    let tasks = (0..partition_count)
        .map(|part_i| {
            let exec = exec.clone();
            tokio::spawn(async move {
                let stream = exec.execute(part_i).await?;
                common::collect(stream).await
            })
        })
        .collect::<Vec<_>>();

    let mut data: Vec<Vec<RecordBatch>> = Vec::new();
    for task in tasks {
        let result = task.await.expect("MemTable::load could not join task")?;
        data.push(result);
    }

    let memtable = MemTable::try_new(SchemaRef::new(project_schema).clone(), data).unwrap();

    println!(
        "prepare memtable usage millis: {}",
        Local::now().timestamp_millis() - dt.timestamp_millis()
    );

    let execution_config = ExecutionConfig::new();
    let mut ctx = ExecutionContext::with_config(execution_config);
    ctx.register_table("lineorder_flat", Arc::new(memtable))?;
    let dt = Local::now();
    let sql = "SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat";
    let df = ctx.sql(sql)?;

    let results: Vec<RecordBatch> = df.collect().await?;
    print_batches(&results)?;
    println!(
        "sql execution usage millis: {}",
        Local::now().timestamp_millis() - dt.timestamp_millis()
    );
    Ok(())
}
Dandandan commented 3 years ago

@jiangzhx

I didn't spot a difference in the two RUSTFLAGS='-C target-cpu=native' cargo +nightly run --release --features "simd mimalloc" --example ssb_memtable_sum_example commands you showed.

If you do want to do a full comparison I would do:

And combinations of the above.

Without -C target-cpu=native the compiler only uses instructions until sse2 - by default. There are exceptions to this in Arrow, for summing we compile a version with avx enabled too by utilizing multiversion: https://github.com/apache/arrow-rs/blob/master/arrow/src/compute/kernels/aggregate.rs#L107 That uses runtime feature detection instead, so is independent of the target-cpu setting.

alamb commented 1 year ago

I think this question has been answered so closing it