apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.39k stars 181 forks source link

Fix the handling of partitioned ListingTables (only Parquet - currently) #966

Closed bcmcmill closed 4 months ago

bcmcmill commented 5 months ago

Which issue does this PR close?

Closes #747. Closes #862.

Rationale for this change

Hive style partition columns were not being handled properly, as they are in DataFusion.

What changes are included in this PR?

Added a transform to the ExecutionPlan in ShuffleWriterExec to correct an issue in file_schema in the FileScanConfig. This only addresses ListingTables of Parquet type due to not seeing an easy way to extract the FileCompressionType information.

Hopefully someone can point me to where to correct this problem permanently, but it appears to need to happen in the DataFusion repository.

Are there any user-facing changes?

No

bcmcmill commented 5 months ago

It would likely be better to update parse_protobuf_file_scan_config in datafusion-proto to be like the following, so that the fix actually extends to all file types.

pub fn parse_protobuf_file_scan_config(
    proto: &protobuf::FileScanExecConf,
    registry: &dyn FunctionRegistry,
) -> Result<FileScanConfig> {
    let schema: Arc<Schema> = Arc::new(convert_required!(proto.schema)?);
    let projection = proto
        .projection
        .iter()
        .map(|i| *i as usize)
        .collect::<Vec<_>>();
    let projection = if projection.is_empty() {
        None
    } else {
        Some(projection)
    };
    let statistics = convert_required!(proto.statistics)?;

    let file_groups: Vec<Vec<PartitionedFile>> = proto
        .file_groups
        .iter()
        .map(|f| f.try_into())
        .collect::<Result<Vec<_>, _>>()?;

    let object_store_url = match proto.object_store_url.is_empty() {
        false => ObjectStoreUrl::parse(&proto.object_store_url)?,
        true => ObjectStoreUrl::local_filesystem(),
    };

    // extract types of partition columns
    let table_partition_cols = proto
        .table_partition_cols
        .iter()
        .map(|col| Ok(schema.field_with_name(col)?.clone()))
        .collect::<Result<Vec<_>>>()?;

    let mut output_ordering = vec![];
    for node_collection in &proto.output_ordering {
        let sort_expr = node_collection
            .physical_sort_expr_nodes
            .iter()
            .map(|node| {
                let expr = node
                    .expr
                    .as_ref()
                    .map(|e| parse_physical_expr(e.as_ref(), registry, &schema))
                    .unwrap()?;
                Ok(PhysicalSortExpr {
                    expr,
                    options: SortOptions {
                        descending: !node.asc,
                        nulls_first: node.nulls_first,
                    },
                })
            })
            .collect::<Result<Vec<PhysicalSortExpr>>>()?;
        output_ordering.push(sort_expr);
    }

    // Currently when the schema for the file is set the partition columns
    // are present, which is illegal because they aren't actually in the files.
    // This is a workaround to remove them from the schema.
    let file_schema = Arc::new(Schema::new(
        schema
            .fields()
            .iter()
            .filter(|field| !table_partition_cols.contains(field))
            .cloned()
            .collect::<Vec<_>>(),
    ));

    Ok(FileScanConfig {
        object_store_url,
        file_schema,
        file_groups,
        statistics,
        projection,
        limit: proto.limit.as_ref().map(|sl| sl.limit as usize),
        table_partition_cols,
        output_ordering,
    })
}

I can open a PR for that over there if that is more appropriate.

andygrove commented 5 months ago

I can open a PR for that over there if that is more appropriate.

@bcmcmill Yes, I think that would be better. Once a fix is merged over in DF we can update this repo to depend on DF from GitHub, rather than having to wait for a new release.

bcmcmill commented 5 months ago

https://github.com/apache/arrow-datafusion/pull/9126

bcmcmill commented 4 months ago

apache/arrow-datafusion#9126

This DataFusion PR was merged. So I will close this one.