apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.23k stars 1.18k forks source link

Complete substrait support for ParquetExec round trip #9347

Open alamb opened 8 months ago

alamb commented 8 months ago

@devinjdangelo says

I looked into the example and the physical plan substrait producer/consumer code. Unfortunately for physical plans, the subtrait consumer and producer are only implemented for ParquetExec and even then they are not fully implemented, so I do not believe any practical example will execute without further development.

Here is an example which makes it further than the above but panics on the roundtrip assertion:

use datafusion::prelude::*;
use std::collections::HashMap;
use datafusion::error::Result;
use datafusion_substrait::physical_plan;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()>{
    // Create a plan that scans table 't'
    let ctx = SessionContext::new();
    let testdata = datafusion::test_util::parquet_test_data();
    ctx.register_parquet(
        "alltypes_plain",
        &format!("{testdata}/alltypes_plain.parquet"),
        ParquetReadOptions::default(),
    )
    .await?;
    let df = ctx
        .sql(
            "SELECT * from alltypes_plain",
        )
        .await?;

    let physical_plan = df.create_physical_plan().await?;

    // Convert the plan into a substrait (protobuf) Rel
    let mut extension_info= (vec![], HashMap::new());
    let substrait_plan = physical_plan::producer::to_substrait_rel(physical_plan.as_ref(), &mut extension_info)?;

    // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
    let physical_round_trip = physical_plan::consumer::from_substrait_rel(
        &ctx, &substrait_plan, &HashMap::new()
    ).await?;
    assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
    Ok(())
}

And here is the panic output:

thread 'main' panicked at datafusion/substrait/src/lib.rs:37:2:
assertion `left == right` failed
  left: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Exact(8), total_byte_size: Exact(671), column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], projected_statistics: Statistics { num_rows: Exact(8), total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, distinct_count: Absent }] }, projected_schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bool_col\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"tinyint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"smallint_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"int_col\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"bigint_col\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"float_col\", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"double_col\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"date_string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_col\", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
 right: "ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, enable_bloom_filter: None, base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, file_groups={1 group: [[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]}, projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [] }, projected_schema: Schema { fields: [], metadata: {} }, projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric { value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate: None, page_pruning_predicate: None, metadata_size_hint: None, parquet_file_reader_factory: None }"
stack backtrace:
...

You can see that the round trip lost many details about the ParquetExec such as projected_schema and projected_statistics.

I think if we want to include a user facing example of a physical plan substrait roundtrip, we will need to cut a ticket to complete the implementation of ParquetExec to substrait first.

It looks like #5176 built the initial framework for serializing physical plans, but it hasn't been picked up since then.

Originally posted by @devinjdangelo in https://github.com/apache/arrow-datafusion/issues/9299#issuecomment-1958303415

alamb commented 4 months ago

I added to the substrait support epic: https://github.com/apache/datafusion/issues/5173