trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.43k stars 3k forks source link

Regression about Iceberg DELETE statement for nested field #15178

Open ebyhr opened 1 year ago

ebyhr commented 1 year ago

Steps to reproduce:

CREATE TABLE test (parent row(child int));
INSERT INTO test SELECT row(1);
ALTER TABLE test SET PROPERTIES partitioning = ARRAY['"parent.child"'];

DELETE statement works in 403.

DELETE FROM test;
DELETE: 1 row

DELETE statement doesn't work in upstream master regardless of legacy_update_delete_implementation properties.

SET SESSION legacy_update_delete_implementation=true;
DELETE FROM test;
Query 20221124_111952_00030_wehb9 failed: Cannot find source column for partitioning field 1000: parent.child: identity(2)
java.lang.NullPointerException: Cannot find source column for partitioning field 1000: parent.child: identity(2)
    at java.base/java.util.Objects.requireNonNull(Objects.java:334)
    at io.trino.plugin.iceberg.IcebergMetadata.lambda$getWriteLayout$9(IcebergMetadata.java:741)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:510)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at io.trino.plugin.iceberg.IcebergMetadata.getWriteLayout(IcebergMetadata.java:743)
    at io.trino.plugin.iceberg.IcebergMetadata.getInsertLayout(IcebergMetadata.java:727)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getInsertLayout(ClassLoaderSafeConnectorMetadata.java:134)
    at io.trino.metadata.MetadataManager.getInsertLayout(MetadataManager.java:785)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.createMergeAnalysis(StatementAnalyzer.java:3385)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitDelete(StatementAnalyzer.java:802)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitDelete(StatementAnalyzer.java:471)
    at io.trino.sql.tree.Delete.accept(Delete.java:61)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:488)
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:450)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:79)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:71)
    at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:267)
    at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:204)
    at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:856)
    at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:138)
    at io.trino.$gen.Trino_testversion____20221124_111841_71.call(Unknown Source)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

SET SESSION legacy_update_delete_implementation=false;
DELETE FROM test;
Query 20221124_111959_00032_wehb9 failed: Cannot find source column for partitioning field 1000: parent.child: identity(2)
java.lang.NullPointerException: Cannot find source column for partitioning field 1000: parent.child: identity(2)
    at java.base/java.util.Objects.requireNonNull(Objects.java:334)
    at io.trino.plugin.iceberg.IcebergMetadata.lambda$getWriteLayout$9(IcebergMetadata.java:741)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:510)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at io.trino.plugin.iceberg.IcebergMetadata.getWriteLayout(IcebergMetadata.java:743)
    at io.trino.plugin.iceberg.IcebergMetadata.getInsertLayout(IcebergMetadata.java:727)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getInsertLayout(ClassLoaderSafeConnectorMetadata.java:134)
    at io.trino.metadata.MetadataManager.getInsertLayout(MetadataManager.java:785)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.createMergeAnalysis(StatementAnalyzer.java:3385)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitDelete(StatementAnalyzer.java:802)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitDelete(StatementAnalyzer.java:471)
    at io.trino.sql.tree.Delete.accept(Delete.java:61)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:488)
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:450)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:79)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:71)
    at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:267)
    at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:204)
    at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:856)
    at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:138)
    at io.trino.$gen.Trino_testversion____20221124_111841_71.call(Unknown Source)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
findepi commented 1 year ago

Marking this as "release-blocker", since it's a functionality regression present in master, but not present in the last release.

findepi commented 1 year ago

@ebyhr did you by any chance bisect the history to determine cause?

ebyhr commented 1 year ago

@findepi 84848cbafb85c4c78f1b48ebff2e4d7d289dccb1 is the first commit that throws an exception.

findepi commented 1 year ago

@findepi 84848cb is the first commit that throws an exception.

cc @djsstarburst

djsstarburst commented 1 year ago

This test, transliterated from the SQL in the summary, shows the same problem when added to BaseIcebergConnectorTest:

    @Test
    public void testDeleteAll()
    {
        String tableName = "test_delete_all_" + randomNameSuffix();
        assertUpdate(getSession(), "CREATE TABLE %s (parent row(child int))".formatted(tableName));
        assertUpdate("INSERT INTO %s SELECT row(1)".formatted(tableName), 1);
        assertUpdate(getSession(), "ALTER TABLE %s SET PROPERTIES partitioning = ARRAY['\"parent.child\"']".formatted(tableName));
        assertUpdate("DELETE FROM " + tableName, 1);
        assertUpdate(getSession(), "DROP TABLE " + tableName);
    }
djsstarburst commented 1 year ago

As reported, this regression in DELETE was introduced by 84848cb. That commit unconditionally creates a MergeAnalysis instance when analyzing a DELETE, whether or not we are planning the DELETE using MERGE components. When creating the MergeAnalysis instance, the code calls metadata.getInsertLayout(). For Iceberg, this calls IcebergMetadata.getWriteLayout(). getWriteLayout() enumerates the ids of the top-level columns in the table, but does not include the ids of columns nested inside of structs. But the subsequent code in getWriteLayout() expects the id of the nested child column to appear in the enumeration, because it is a partition key.

During analysis for legacy DELETE, there is no need to call createMergeAnalysis() at all. However, as @ebyhr reported, the bug happens whether or not we are running legacy DELETE. I'm guessing we have the same bug in Iceberg UPDATE and MERGE.

Bottom line: we need to figure out how to bring the Iceberg enumeration of column ids into alignment with what IcebergMetadata.getWriteLayout() is expecting. I tried including all columns, including nested columns, in the map of column id to column handle used by getWriteLayout(), but that fails in the INSERT before we ever get to the DELETE.

djsstarburst commented 1 year ago

This PR fixes the DELETE regression by not creating an insert layout if the operation is a DELETE.

HOWEVER, I'm pretty sure that Iceberg still has a problem with UPDATE and MERGE operations with partition columns that are elements of a struct column. I haven't been able to fix that Iceberg problem. Perhaps @electrum or @alexjo2144 might have suggestions?

alexjo2144 commented 1 year ago

Iceberg still has a problem with UPDATE and MERGE operations with partition columns that are elements of a struct column

Yep, @ebyhr has a PR to address that situation. It isn't clear yet if that should be supported in the first place, Spark has some issue reading those tables.

djsstarburst commented 1 year ago

The PR shows failures in some Hive DELETE and UPDATE tests. If those failures don't appear in the master branch, it doesn't make sense to merge the PR.

martint commented 1 year ago

Does this break anything that should've worked before? From the comments above, my understanding is that nested fields as partition keys shouldn't work anyway. In that case, this wouldn't be a release blocker.

electrum commented 1 year ago

If I understand the change in https://github.com/trinodb/trino/pull/15142 correctly, that would break the INSERT part of this reproduction? Thus we don't need this to be a release blocker?

ebyhr commented 1 year ago

Does this break anything that should've worked before?

Yes. I guess it's not a common issue in Iceberg connector's users though. The connector doesn't support creating such tables, so it requires changing the partition definition after the creation or other query engines (e.g. Spark).

INSERT part of this reproduction should still work because the statement is executed before setting partitioning. INSERT for partitioned by nested fields didn't work even in 403.

alexjo2144 commented 1 year ago

Partitioning on nested row fields seems to be pretty broken across the board, both in Trino and in Spark. Deleting the entire contents on these tables happened to work before, but most operations don't and never have. I don't think we need to call this a release blocker.

djsstarburst commented 1 year ago

Please remove the release blocker tag.