apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.39k stars 181 forks source link

Ballista: Partition columns are duplicated in protobuf decoding. #484

Open thinkharderdev opened 2 years ago

thinkharderdev commented 2 years ago

Describe the bug Trying to read a partitioned parquet dataset while still allowing predicate pushdown on partition columns, I am manually constructing a table scan Logical plan on a manually constructed ListingTable which specified the partition column(s). The ListingTable constructor will add the partition columns to the Schema. This is then serialized and sent to the ballista scheduler which will deserialize and construct a new ListingTable, which will again add the partition column to the schema and result in an error when constructing the DFSchema

To Reproduce Steps to reproduce the behavior:

// Assume we have a paritioned parquet table
let table_path = "/path/to/table";

// Construct a ListingOptions
    let listing_options = ListingOptions {
        file_extension: String::new(),
        format: Arc::new(ParquetFormat::default()),
        table_partition_cols: vec!["my-partition-column".into()],
        collect_stat: true,
        target_partitions: 1,
    };

// Infer the schema
    let schema = listing_options.infer_schema(store.clone(), table_path).await?;

// Construct a ListingTable with our provider
    let provider = ListingTable::new(
        store,
        "/path/to/some/table".to_string(),
        schema,
        listing_options,
    );

// Create a table scan plan
let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, Arc::new(provider), None)?
        .limit(10)?
        .build()?;

// Create a DistributedExecQuery
let query = Arc::new(DistributedQueryExec::new(scheduler_url, config, plan));

// Execute the query
let stream = plan.execute(0).await?;

Expected behavior A clear and concise description of what you expected to happen. This should work and the planner should pushdown a filter on my-parition-column to the physical scan so we only read parquet files from the requested partitions.

Additional context Add any other context about the problem here. A simple way to fix this would be to check in the ListingTable constructor whether we already have the partition columns included in the schema:

    pub fn new(
        object_store: Arc<dyn ObjectStore>,
        table_path: String,
        file_schema: SchemaRef,
        options: ListingOptions,
    ) -> Self {
        // Add the partition columns to the file schema
        let mut table_fields = file_schema.fields().clone();
        for part in &options.table_partition_cols {
           // Only add the partition column if it doesn't already exist
            if table_fields.iter().find(|f| f.name() == part).is_none() {
                table_fields.push(Field::new(
                    part,
                    DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
                    false,
                ));
            }
        }

        Self {
            object_store,
            table_path,
            file_schema,
            table_schema: Arc::new(Schema::new(table_fields)),
            options,
        }
    }

When I try this locally it works in the sense that I don't get an error for duplicate fields, but I do get another error downstream. My guess is that this is because the partition column datatype is hard-coded but haven't debugged it fully.

houqp commented 2 years ago

Thanks @thinkharderdev for the report, this is due to: https://github.com/apache/arrow-datafusion/blob/68db579181bd826e6ab6cd659f52d443b950eaa5/datafusion/src/datasource/listing/table.rs#L128-L135

The fix should be pretty straight forward, we need a new constructor method for listing table for plan ser/de, which will just take the filed arguments and store them in the new struct as is without extra logic. Then we just need to call that new constructor method in https://github.com/apache/arrow-datafusion/blob/68db579181bd826e6ab6cd659f52d443b950eaa5/ballista/rust/core/src/serde/logical_plan/from_proto.rs#L200.

thinkharderdev commented 2 years ago

Thanks @thinkharderdev for the report, this is due to:

https://github.com/apache/arrow-datafusion/blob/68db579181bd826e6ab6cd659f52d443b950eaa5/datafusion/src/datasource/listing/table.rs#L128-L135

The fix should be pretty straight forward, we need a new constructor method for listing table for plan ser/de, which will just take the filed arguments and store them in the new struct as is without extra logic. Then we just need to call that new constructor method in

https://github.com/apache/arrow-datafusion/blob/68db579181bd826e6ab6cd659f52d443b950eaa5/ballista/rust/core/src/serde/logical_plan/from_proto.rs#L200

.

Cool. I can take a crack at that. Thanks!