apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.21k stars 957 forks source link

Support custom SchemaAdapter on ParquetExec #10398

Closed HawaiianSpork closed 1 day ago

HawaiianSpork commented 1 week ago

Is your feature request related to a problem or challenge?

This is a feature request to allow the ParquetExec type to accept a SchemaAdapter instead of having a fixed SchemaAdapter. By supporting a SchemaAdapter to be injected, the same ParquetExec could be reused by a number of protocols that build upon parquet. For example, delta-rs keeps the schema separate from the parquet so that schema evolution can be well controlled. For instance, the external schema can enrich the data inside the parquet files with missing nested columns or timezone information.

This same pattern may also be useful for other storage formats as well as the mapper just accepts the record batch from the file and a desired Table Schema.

Describe the solution you'd like

ParquetExec accepts a SchemaAdapterFactory which then the ParquetExec will call to create SchemaAdapter per parquet file. The SchemaAdapter likewise will check the schema's can be mapped and return a SchemaMapper (just like it does today) which is used to transform the RecordBatch into the desired format.

Describe alternatives you've considered

It could be considered that the ParquetExec should be closed to modification and instead it should either be decorated or new ExecutionPlan should be built. There is a lot of parquet specific code in the ParquetExec which these protocols would have to rebuild. Alternatively we could change the interface for ExecutionPlan which would be a breaking change.

Another approach is to say that we don't want to support different ways of casting arrow batches to different protocols and all these changes should be made in arrow. I think different applications are going to have different constraints about what migrations they choose to support . For instance, arrow today will cast one struct based on the position of the fields, this is great for short lived record batches that are trying to just rename fields, but this would be problematic for long lived arrow batches stored as parquet as the code that wrote the record batch may not be the same that read the record batch. So there is opportunity to both improve arrow but also allow how it is used to diverge.

Additional context

I've got a code change ready that I can make a PR soon.

We had some conversation about this in discourse here: https://discord.com/channels/885562378132000778/1166447479609376850/1236683250244517991

tustvold commented 1 week ago

I wonder if the way to achieve this might be something like https://github.com/apache/datafusion/issues/2293, this would allow making schema adaption a standard execution node, as opposed to an implementation detail of ParquetExec

alamb commented 1 week ago

Perhaps a good starting place would be to make Schema Adapter public. It seems entirely an private struct today https://docs.rs/datafusion/latest/datafusion/index.html?search=SchemaAdapter

In general I think this ticket is another potential reason to make ParquetExec easier to use by other libraries, not just DataFusion itself (via ListingTable)

We certainly use ParquetExec directly in INfluxDB 3.0 so I would be interested in helping make this easier to do (as it would also decrease our maintenance burden()