trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.47k stars 3.01k forks source link

Delta Lake connector: DELETE operations fail when multiple are performed simultaneously (or nearly) #16413

Closed vscfreire closed 1 year ago

vscfreire commented 1 year ago

I'm encountering this problem, when performing DELETE tasks (two or more) at the same time or very close to one another.

If I perform the two DELETEs exactly at the same time, both fail with Query XXX failed: Failed to write Delta Lake transaction log entry. If I perform both in quick succession, only the second one fails. Immediately afterwards, I can perform the second one successfully.

This also happens with several DELETE operations in quick succession, only the first one completes, the remaining fail.

The error type is EXTERNAL and the error code DELTA_LAKE_BAD_WRITE (84279299).

The stack trace, obtained from the web console, is:

io.trino.spi.TrinoException: Failed to write Delta Lake transaction log entry
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishWrite(DeltaLakeMetadata.java:1967)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishDelete(DeltaLakeMetadata.java:1424)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishDelete(ClassLoaderSafeConnectorMetadata.java:652)
    at io.trino.metadata.MetadataManager.finishDelete(MetadataManager.java:1042)
    at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4062)
    at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
    at io.trino.operator.Driver.processInternal(Driver.java:411)
    at io.trino.operator.Driver.lambda$process$10(Driver.java:314)
    at io.trino.operator.Driver.tryWithLock(Driver.java:706)
    at io.trino.operator.Driver.process(Driver.java:306)
    at io.trino.operator.Driver.processForDuration(Driver.java:277)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:739)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:164)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:519)
    at io.trino.$gen.Trino_403____20230210_115917_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException: Transaction log locked(2); lockingCluster=trino-403-1039ffbb-daf8-48b0-b9a4-326c125f1706; lockingQuery=20230213_153942_00052_5yrxi; expires=2023-02-13T15:44:42.742Z
    at io.trino.plugin.deltalake.transactionlog.writer.S3TransactionLogSynchronizer.write(S3TransactionLogSynchronizer.java:128)
    at io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter.flush(TransactionLogWriter.java:104)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishWrite(DeltaLakeMetadata.java:1958)
    ... 17 more

I can provide more extensive data related to the error, if needed.

Using delta tables stored on MinIO, accessed using s3a. Apart from this problem, it's working correctly and with good performance. The delta.enable-non-concurrent-writes option on the connector configuration was set to true.

Is this related to the above situation? Is there anything we should do to allow concurrent writes on delta tables?

Thanks.

findinpath commented 1 year ago

@vscfreire what you are experiencing is the expected behaviour.

If there are two concurrent queries performing changes on the table, only the first query will succeed. Did you experience a rather different behaviour in another query engine?

Do note that delta.enable-non-concurrent-writes is about acknowledging the fact that the table may get corrupted while using another query engine next to Trino.

vscfreire commented 1 year ago

Hello, @findinpath, sorry for the delayed response.

I haven't used other query engines like Trino, only relational databases, where this kind of concurrent operations is not a problem.

I have seen in the documentation that the delta.enable-non-concurrent-writes flag is about warning to not use other query engines beyond Trino, I just mentioned it because it's a required flag for writes (and DELETE) to work on Delta Tables on S3.

Do you know if this a limitation of the implementation at the Trino level, or if it's due to limitations on underlying layers (ie. Delta Tables, S3, etc.)?

If it's a Trino limitation, I'd suggest that the documentation mentions that limitation, to avoid future confusion. Currently, it only mentions that «You can use the connector to INSERT, DELETE, UPDATE, and MERGE data in Delta Lake tables». Even if it's related to underlying layers, the documentation could also mention it.

Thanks.

findinpath commented 1 year ago

I'm quoting @findepi from a private Slack discussion

Trino have “concurrent writer reconciliation logic” but it did not implement any. This is a current limitation and not how the longterm desired outcome for the Delta Lake connector.

@vscfreire would you be comfortable in creating yourself a PR to improve the wording about this limitation?

findepi commented 1 year ago

yes, this is a known limitation. I couldn't find an issue about this, so created a new one https://github.com/trinodb/trino/issues/16985

yes, delta.enable-non-concurrent-writes is unrelated. please do not set that property.

Thelin90 commented 1 year ago

Hello.

I have seen this behaviour for a while now.

You don't see this behaviour in spark, delta-rs when you do concurrent appends.

The delta connector does not seem to lock in the correct metadata delta version like other engines do.

I have seen this behaviour for

I have also raised this with starburst since I use galaxy at work, but I have seen the same behaviour now when I tried on latest open source.

semeteycoskun commented 1 year ago

It writes same error logs for Update operation: DELTA_LAKE_BAD_WRITE:

2023-07-29T20:53:43.817Z    ERROR   stage-scheduler io.trino.execution.scheduler.PipelinedQueryScheduler    Failure in distributed stage for query 20230729_205336_00230_cmyyf
io.trino.spi.TrinoException: Unable to rewrite Parquet file
    at io.trino.plugin.deltalake.DeltaLakeMergeSink.rewriteFile(DeltaLakeMergeSink.java:353)
    at io.trino.plugin.deltalake.DeltaLakeMergeSink.lambda$finish$6(DeltaLakeMergeSink.java:310)
    at java.base/java.util.HashMap.forEach(HashMap.java:1421)
    at io.trino.plugin.deltalake.DeltaLakeMergeSink.finish(DeltaLakeMergeSink.java:309)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMergeSink.finish(ClassLoaderSafeConnectorMergeSink.java:52)
    at io.trino.operator.MergeWriterOperator.finish(MergeWriterOperator.java:196)
    at io.trino.operator.Driver.processInternal(Driver.java:413)
    at io.trino.operator.Driver.lambda$process$8(Driver.java:298)
    at io.trino.operator.Driver.tryWithLock(Driver.java:694)
    at io.trino.operator.Driver.process(Driver.java:290)
    at io.trino.operator.Driver.processForDuration(Driver.java:261)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:887)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:187)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:555)
    at io.trino.$gen.Trino_421____20230726_070853_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.FileNotFoundException: File does not exist: s3a://MY/LAKEHOUSE/PATTH/date=2023-07-09************
    at io.trino.hdfs.s3.TrinoS3FileSystem.getFileStatus(TrinoS3FileSystem.java:517)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
    at io.trino.filesystem.hdfs.HdfsInputFile.lambda$lazyStatus$2(HdfsInputFile.java:130)
    at io.trino.hdfs.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:25)
    at io.trino.hdfs.HdfsEnvironment.doAs(HdfsEnvironment.java:125)
    at io.trino.filesystem.hdfs.HdfsInputFile.lazyStatus(HdfsInputFile.java:130)
    at io.trino.filesystem.hdfs.HdfsInputFile.length(HdfsInputFile.java:77)
    at io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:47)
    at io.trino.filesystem.tracing.TracingInputFile.length(TracingInputFile.java:79)
    at io.trino.plugin.hive.parquet.TrinoParquetDataSource.<init>(TrinoParquetDataSource.java:37)
    at io.trino.plugin.deltalake.DeltaLakeWriter.readStatistics(DeltaLakeWriter.java:203)
    at io.trino.plugin.deltalake.DeltaLakeWriter.getDataFileInfo(DeltaLakeWriter.java:197)
    at io.trino.plugin.deltalake.DeltaLakeMergeSink.rewriteParquetFile(DeltaLakeMergeSink.java:461)
    at io.trino.plugin.deltalake.DeltaLakeMergeSink.rewriteFile(DeltaLakeMergeSink.java:347)
    ... 17 more

2023-07-29T20:53:43.823Z INFO dispatcher-query-504 io.trino.event.QueryMonitor TIMELINE: Query 20230729_205336_00230_cmyyf :: FAILED (DELTA_LAKE_BAD_WRITE)

Tech Stack:

findepi commented 1 year ago

@semeteycoskun yes, the limitation still exists you may want to track https://github.com/trinodb/trino/issues/16985.