apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.49k stars 1.02k forks source link

Introduce ProjectionMask To Allow Nested Projection Pushdown #2581

Open tustvold opened 2 years ago

tustvold commented 2 years ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently projection indices are pushed down to scans as Vec<usize>. This creates some ambiguities:

To demonstrate how these problems intertwine, consider the case of

Struct {
   first: Struct {
      a: Integer,
      b: Integer,
   },
   second: Struct {
      c: Integer
   }
}

If I project ["first.a", "second.c", "first.b"] what is the resulting schema?

Describe the solution you'd like

I would like to propose we instead pushdown a leaf column mask, where leaf columns are fields with no children, as enumerated by a depth-first-scan of the schema tree. This avoids any ordering ambiguities, whilst also being relatively straightforward to implement and interpret.

I recently introduced a similar concept to the parquet reader https://github.com/apache/arrow-rs/pull/1716. We could theoretically lift this into arrow-rs, potentially adding support to RecordBatch for it, and then use this in DataFusion.

Describe alternatives you've considered

We could not support nested pushdown

Additional context

Currently pushdown for nested types in ParquetExec is broken - https://github.com/apache/arrow-datafusion/issues/2453

Thoughts @andygrove @alamb

alamb commented 2 years ago

I like the idea of a single Projection structure that understand how to select nested types.

We could theoretically lift this into arrow-rs

As long as we can maintain independence for parquet (aka one can select subsets of parquet files without having to use Arrow) that is great.

If we don't get the Projection into arrow-rs in time, we could also copy/paste the Projection code into DataFusion and provide a conversion back and forth during the interim period

kesavkolla commented 2 years ago

It gets little more indepth if the struct has members which are list of struct again. How will the schema pushdown will happen to them? Eg:

Struct Employee {
  name: String,
  departments: Vec<Department>,
}
Struct Department {
    id: u32,
    name: String,
}

How will the proection appear for this?

tustvold commented 2 years ago

How will the projection appear for this?

In this case you have roots of

  1. name
  2. departments

And leaves of

  1. name
  2. department.id
  3. department.name

So if say you projected with leaves [1, 3] you would get a schema of

Struct Employee {
  name: String,
  departments: Vec<ProjectedDepartment>
}
struct ProjectedDepartment {
  name: String
}

Or in terms of the arrow datatype

DataType::Struct(vec![
  Field("name", DataType::Utf8, <nullable>),
  Field("departments", DataType::List(Box::new(
    Field::new("element", DataType::Struct(vec![
      Field::new("name", DataType::Utf8, <nullable>)
    ]), <nullable>)
  )), <nullable>)
])

Does that make sense?

kesavkolla commented 2 years ago

My question is will the projection goes to nested levels?

Eg: employee.departments[*].name, employee.departments[0].name

tustvold commented 2 years ago

At least in the case of arrow and parquet, list indexing is more of a filter than a cheap projection - it requires rewriting the buffers.

Perhaps we could do the common case as described here, and potentially add list index pushdown as an extension once we have workloads that stand to benefit?? Or did you have a particular suggestion on how to handle it?

kesavkolla commented 2 years ago

I agree with your thought process. List indexing is not push down as much as the column itself. I am guessing at some point datafusion will have support at the SQL level for the list indexing.

nl5887 commented 2 years ago

@tustvold is this something you're working on already?

nl5887 commented 2 years ago

This PR is slightly related, as predicates aren't being pushed down currently:

https://github.com/apache/arrow-datafusion/pull/2724

tustvold commented 2 years ago

@nl5887 I am not currently working on this, but would be happy to assist if you or someone else wanted to pick it up 😀

nl5887 commented 2 years ago

@tustvold I think quite a lot needs to be changed. Most of the code will do column selection by name, whereas the relevant data of the sql parsing (the indexed field structure) is lost.

Correct me if I'm wrong, but I think the datafusion core column needs to be converted to an enum consisting of Column and IndexedField. The retrieval from the DFSchema needs to be done using the column itself instead of (qualified) name, and the required_columns shouldn't be derived from the output schema, but from the plan itself.

Probably a lot more needs to be done, but this is necessary to be able to be able to push down the projections.

Looking forward to your thoughts!

tustvold commented 2 years ago

but I think the datafusion core column needs to be converted to an enum consisting of Column and IndexedField

As described above, list index pushdown is likely to yield limited benefits for any of the formats we currently support. As such I don't think we need to support it in projection pushdown as a first step.

whereas the relevant data of the sql parsing (the indexed field structure) is lost.

I'm not sure I follow what you mean, I would have expected list indexing to just be a different type of PhysicalExpr without the need to leak into the DFSchema at all?