apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

[comet-parquet-exec] Use Parquet schema for scan instead of Spark schema #1103

Open mbutrovich opened 2 days ago

mbutrovich commented 2 days ago

Currently we get the scan schema from the plan nodes scan schema, and then serialize that back to a Parquet schema, then parse that on the native side. This is lossy, particularly with timestamps. For example:

schema: message root {
  optional int64 _0 (TIMESTAMP(MILLIS,true));
  optional int64 _1 (TIMESTAMP(MICROS,true));
  optional int64 _2 (TIMESTAMP(MILLIS,true));
  optional int64 _3 (TIMESTAMP(MILLIS,false));
  optional int64 _4 (TIMESTAMP(MICROS,true));
  optional int64 _5 (TIMESTAMP(MICROS,false));
  optional int64 _6 (INTEGER(64,true));
}

dataSchema: message spark_schema {
  optional int96 _0;
  optional int96 _1;
  optional int96 _2;
  optional int64 _3 (TIMESTAMP(MICROS,false));
  optional int96 _4;
  optional int64 _5 (TIMESTAMP(MICROS,false));
  optional int64 _6;
}

The former is the original Parquet footer, the latter is what we get after going through Spark. We need the original to handle timestamps correctly in ParquetExec.

This PR extracts some code from elsewhere (CometParquetFileFormat, CometNativeScanExec) to read the footer from the Parquet file, and serialize the original metadata. We also now generate the projection vector on the Spark side because the required columns is in Spark schema format, so will not match the Parquet schema 1:1. On the native side, we now have to regenerate the required schema from the Parquet schema using the projection vector (converted to a DF ProjectionMask).