trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.47k stars 3.01k forks source link

[DELTA] Trino concurrent CTAS query failure deletes Data Directory, rendering succeeded one in un-queryable state #24153

Open vinay-kl opened 3 hours ago

vinay-kl commented 3 hours ago

Hello Team, We are using v454 in prod, facing the following issue

Trino concurrent CTAS with same name on delta-table, which in-particular has failed, cleans up data-directory as part of rollback, the succeeded one is rendered useless as it's not queryable and drop is also not possible on Trino.

Reproduction Steps

trino> create table delta.dev.delete_trino_test_stg1 as SELECT a, b, 20220101 as d FROM UNNEST(SEQUENCE(1, 9001), SEQUENCE(1, 9001)) AS t(a, b);
CREATE TABLE: 9001 rows

trino> create table delta.dev.delete_trino_test_stg2 as SELECT a, b, 20220101 as d FROM UNNEST(SEQUENCE(1, 9001), SEQUENCE(1, 9001)) AS t(a, b);
CREATE TABLE: 9001 rows

trino> create table delta.dev.sample1 as ((select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d));
CREATE TABLE: 243054003 rows

trino> create table delta.dev.sample1 as ((select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d));

Query 20241117_112931_00025_me7j4, FAILED, 1 node

Query 20241117_112931_00025_me7j4 failed: Failed to write Delta Lake transaction log entry
io.trino.spi.TrinoException: Failed to write Delta Lake transaction log entry
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishCreateTable(DeltaLakeMetadata.java:1763)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishCreateTable(ClassLoaderSafeConnectorMetadata.java:578)
    at io.trino.tracing.TracingConnectorMetadata.finishCreateTable(TracingConnectorMetadata.java:659)
    at io.trino.metadata.MetadataManager.finishCreateTable(MetadataManager.java:1190)
    at io.trino.tracing.TracingMetadata.finishCreateTable(TracingMetadata.java:629)
    at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4104)
    at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
    at io.trino.operator.Driver.processInternal(Driver.java:403)
    at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
    at io.trino.operator.Driver.tryWithLock(Driver.java:709)
    at io.trino.operator.Driver.process(Driver.java:298)
    at io.trino.operator.Driver.processForDuration(Driver.java:269)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
    at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
    at io.trino.$gen.Trino_dev____20241117_111225_2.run(Unknown Source)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
    at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
    at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.UncheckedIOException: java.nio.file.FileAlreadyExistsException: /tmp/dev/sample1/_delta_log/00000000000000000000.json
    at io.trino.plugin.deltalake.transactionlog.writer.NoIsolationSynchronizer.write(NoIsolationSynchronizer.java:52)
    at io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter.flush(TransactionLogWriter.java:110)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishCreateTable(DeltaLakeMetadata.java:1705)
    ... 25 more
Caused by: java.nio.file.FileAlreadyExistsException: /tmp/dev/sample1/_delta_log/00000000000000000000.json
    at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:104)
    at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:62)
    at io.trino.filesystem.TrinoOutputFile.create(TrinoOutputFile.java:33)
    at io.trino.filesystem.tracing.TracingOutputFile.lambda$create$0(TracingOutputFile.java:47)
    at io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:47)
    at io.trino.filesystem.tracing.TracingOutputFile.create(TracingOutputFile.java:47)
    at io.trino.plugin.deltalake.transactionlog.writer.NoIsolationSynchronizer.write(NoIsolationSynchronizer.java:47)
    ... 27 more
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: /tmp/dev/sample1/_delta_log/00000000000000000000.json
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:557)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:595)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:642)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:730)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:709)
    at io.trino.hdfs.TrinoFileSystemCache$FileSystemWrapper.create(TrinoFileSystemCache.java:364)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1229)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1206)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1087)
    at io.trino.filesystem.hdfs.HdfsOutputFile.lambda$create$1(HdfsOutputFile.java:100)
    at io.trino.hdfs.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:27)
    at io.trino.hdfs.HdfsEnvironment.doAs(HdfsEnvironment.java:134)
    at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:115)
    at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:100)
    ... 33 more

Rollback Stack-trace

java.base/java.lang.Thread.getStackTrace(Thread.java:2418)
io.trino.filesystem.hdfs.HdfsFileSystem.deleteDirectory(HdfsFileSystem.java:164)
io.trino.filesystem.switching.SwitchingFileSystem.deleteDirectory(SwitchingFileSystem.java:92)
io.trino.filesystem.tracing.TracingFileSystem.lambda$deleteDirectory$2(TracingFileSystem.java:89)
io.trino.filesystem.tracing.Tracing.lambda$withTracing$1(Tracing.java:38)
io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:47)
io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:37)
io.trino.filesystem.tracing.TracingFileSystem.deleteDirectory(TracingFileSystem.java:89)
io.trino.plugin.deltalake.DeltaLakeMetadata.deleteRecursivelyIfExists(DeltaLakeMetadata.java:1608)
io.trino.plugin.deltalake.DeltaLakeMetadata.lambda$beginCreateTable$39(DeltaLakeMetadata.java:1496)
java.base/java.util.Optional.ifPresent(Optional.java:178)
io.trino.plugin.deltalake.DeltaLakeMetadata.rollback(DeltaLakeMetadata.java:3345)
io.trino.plugin.deltalake.DeltaLakeTransactionManager.lambda$rollback$1(DeltaLakeTransactionManager.java:69)
java.base/java.util.Optional.ifPresent(Optional.java:178)
io.trino.plugin.deltalake.DeltaLakeTransactionManager.rollback(DeltaLakeTransactionManager.java:67)
io.trino.plugin.deltalake.DeltaLakeConnector.rollback(DeltaLakeConnector.java:215)
io.trino.metadata.CatalogTransaction.abort(CatalogTransaction.java:93)
io.trino.metadata.CatalogMetadata.safeAbort(CatalogMetadata.java:166)
io.trino.metadata.CatalogMetadata.abort(CatalogMetadata.java:160)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:79)
io.trino.$gen.Trino_dev____20241117_122247_2.run(Unknown Source)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
java.base/java.lang.Thread.run(Thread.java:1570)
vinay-kl commented 2 hours ago

@findinpath For the above particular case, can we catch the exception and propagate the same to not carry out roll-back of Table-Base location directory deletion?

Also https://github.com/trinodb/trino/blob/4c048b3f6decf17871d4c81668487512061f8d76/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java#L1663-L1682, _delta_log directory also getting deleted which needs to be handled as well.