Netflix / iceberg

Iceberg is a table format for large, slow-moving tabular data
Apache License 2.0
478 stars 60 forks source link

Partition schema mangling for ORC #21

Open omalley opened 6 years ago

rdblue commented 6 years ago

What do you mean by "schema mangling"?

omalley commented 6 years ago

I wasn't clear on what was required, I just see the reader path for Parquet and Avro mangle the schemas for partitioned tables:

          if (hasJoinedPartitionColumns) {
            // schema used to read data files
            Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
            Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
            Schema joinedSchema = TypeUtil.join(readSchema, partitionSchema);
            PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
            JoinedRow joined = new JoinedRow();

            InternalRow partition = convertToRow.apply(file.partition());
            joined.withRight(partition);

            // create joined rows and project from the joined schema to the final schema
            Iterator<InternalRow> joinedIter = transform(
                newParquetIterator(location, task, readSchema), joined::withLeft);

            unsafeRowIterator = transform(joinedIter,
                APPLY_PROJECTION.bind(projection(finalSchema, joinedSchema))::invoke);

so I assume I need something similar for ORC. I just didn't dig into the details to understand what was happening in your code.

rdblue commented 6 years ago

Makes sense. For identity partitions, where the exact value is stored in the manifest file, we join to those values and then project to get the column order to match the table's order (we don't reorder columns because of a limitation in Spark's Parquet read path that we are reusing).

rdblue commented 6 years ago

I think a refactor a while back fixed this. We still need to extend the tests for this in Spark to include ORC.