trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.49k stars 3.02k forks source link

Iceberg connector can't analyze equality deletes after dropping a column used in one of the equality delete file #17836

Open findinpath opened 1 year ago

findinpath commented 1 year ago
    @Test
    public void testEqualityDeletesAfterColumnRemoval()
            throws Exception
    {
        String tableName = "test_equality_delets_after_schema_evolution_" + randomNameSuffix();
        assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
        Table icebergTable = loadTable(tableName);
        Assertions.assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");
        Path metadataDir = new Path(metastoreDir.toURI());
        TrinoFileSystem fs = HDFS_FILE_SYSTEM_FACTORY.create(SESSION);

        for (int i=1;i<=2;i++) {
            String deleteFile1 = "delete_file_" + UUID.randomUUID();
            List<String> firstDeleteFileColumns = ImmutableList.of("regionkey", "name");
            Schema deleteRowSchema = icebergTable.schema().select(firstDeleteFileColumns);
            List<Integer> equalityFieldIds = firstDeleteFileColumns.stream()
                    .map(name -> deleteRowSchema.findField(name).fieldId())
                    .collect(toImmutableList());
            Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(new ForwardingFileIo(fs).newOutputFile(new Path(metadataDir, deleteFile1).toString()))
                    .forTable(icebergTable)
                    .rowSchema(deleteRowSchema)
                    .createWriterFunc(GenericParquetWriter::buildWriter)
                    .equalityFieldIds(equalityFieldIds)
                    .overwrite();
            EqualityDeleteWriter<Record> writer = writerBuilder.buildEqualityWriter();

            Record dataDelete = GenericRecord.create(deleteRowSchema);
            try (Closeable ignored = writer) {
                writer.write(dataDelete.copy(ImmutableMap.of("regionkey", 1L, "name", "BRAZIL")));
            }
            icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
        }

        assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN regionkey");

        assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT (name = 'BRAZIL'))");
        assertUpdate("DROP TABLE " + tableName);
    }

fails with stacktrace

java.lang.NullPointerException: Cannot invoke "org.apache.iceberg.types.Types$NestedField.type()" because "field" is null
    at org.apache.iceberg.DeleteFileIndex.canContainEqDeletesForFile(DeleteFileIndex.java:214)
    at org.apache.iceberg.DeleteFileIndex.canContainDeletesForFile(DeleteFileIndex.java:158)
    at org.apache.iceberg.DeleteFileIndex.lambda$forDataFile$3(DeleteFileIndex.java:147)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:178)
    at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
    at org.apache.iceberg.DeleteFileIndex.forDataFile(DeleteFileIndex.java:148)
    at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:122)
    at org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:351)
    at org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
    at org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.next(CloseableIterable.java:294)
    at io.trino.plugin.iceberg.IcebergSplitSource.getNextBatch(IcebergSplitSource.java:223)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource.getNextBatch(ClassLoaderSafeConnectorSplitSource.java:43)
    at io.trino.split.ConnectorAwareSplitSource.getNextBatch(ConnectorAwareSplitSource.java:53)
Heltman commented 1 year ago

@findinpath I add a test in spark on iceberg, got the same error. I believe this standard is not well followed. See my test:

https://github.com/apache/iceberg/compare/master...Heltman:iceberg:iceberg-eqdelete-drop-column

But in Iceberg spec:

If a delete column in an equality delete file is later dropped from the table, it must still be used when applying the equality deletes. If a column was added to a table and later used as a delete column in an equality delete file, the column value is read for older data files using normal projection rules (defaults to null). -- https://iceberg.apache.org/spec/#delete-formats