trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
9.83k stars 2.85k forks source link

Flaky `TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameVersionedTable` #21725

Open ebyhr opened 2 months ago

ebyhr commented 2 months ago
Error:  io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.testConcurrentInsertsSelectingFromTheSameVersionedTable -- Time elapsed: 3.660 s <<< ERROR!
io.trino.testing.QueryFailedException: Failed to write Delta Lake transaction log entry
    at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:133)
    at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:501)
    at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:484)
    at io.trino.testing.QueryRunner.execute(QueryRunner.java:82)
    at io.trino.plugin.deltalake.TestDeltaLakeLocalConcurrentWritesTest.lambda$testConcurrentInsertsSelectingFromTheSameVersionedTable$9(TestDeltaLakeLocalConcurrentWritesTest.java:230)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1570)
    Suppressed: java.lang.Exception: SQL: INSERT INTO test_concurrent_inserts_select_from_same_versioned_table_gu3yeqy900 SELECT 2, 'c' AS part FROM test_concurrent_inserts_select_from_same_versioned_table_gu3yeqy900 FOR VERSION AS OF 0
        at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:508)
        ... 7 more
Caused by: io.trino.spi.TrinoException: Failed to write Delta Lake transaction log entry
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2028)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:633)
    at io.trino.tracing.TracingConnectorMetadata.finishInsert(TracingConnectorMetadata.java:718)
    at io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:1169)
    at io.trino.tracing.TracingMetadata.finishInsert(TracingMetadata.java:706)
    at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4113)
    at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
    at io.trino.operator.Driver.processInternal(Driver.java:403)
    at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
    at io.trino.operator.Driver.tryWithLock(Driver.java:709)
    at io.trino.operator.Driver.process(Driver.java:298)
    at io.trino.operator.Driver.processForDuration(Driver.java:269)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
    at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:76)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
    at io.trino.$gen.Trino_testversion____20240426_081326_177.run(Unknown Source)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
    at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:174)
    at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:161)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: io.trino.spi.TrinoException: Error reading statistics from cache
    at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:67)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.updateTableStatistics(DeltaLakeMetadata.java:3592)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishInsert(DeltaLakeMetadata.java:2010)
    ... 25 more
Caused by: java.lang.IllegalArgumentException: Invalid JSON bytes for [simple type, class io.trino.plugin.deltalake.statistics.ExtendedStatistics]
    at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:196)
    at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:75)
    at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:65)
    at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.lambda$readExtendedStatistics$0(CachingExtendedStatisticsAccess.java:63)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4938)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3576)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2318)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:[219](https://github.com/trinodb/trino/actions/runs/8845096149/job/24288361644#step:5:220)1)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2081)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4019)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4933)
    at io.trino.cache.EvictableCache.get(EvictableCache.java:112)
    at io.trino.cache.CacheUtils.uncheckedCacheGet(CacheUtils.java:37)
    at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.readExtendedStatistics(CachingExtendedStatisticsAccess.java:63)
    ... 27 more
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1767)
    at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:360)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2115)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1603)
    at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:193)
    ... 40 more

https://github.com/trinodb/trino/actions/runs/8845096149/job/24288361644

findinpath commented 2 months ago

The following stacktrace points out that the JSON content read is corrupted:

Caused by: java.lang.IllegalArgumentException: Invalid JSON bytes for [simple type, class io.trino.plugin.deltalake.statistics.ExtendedStatistics]
    at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:196)
    at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:75)
    at io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.readExtendedStatistics(MetaDirStatisticsAccess.java:65)
    at io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.lambda$readExtendedStatistics$0(CachingExtendedStatisticsAccess.java:63)

The issue reported there seems to be related with the way that LocalOutputFile writes content to the persistence:

https://github.com/trinodb/trino/blob/7f7b5e263f825cbf3c5c0a0074f096ab9b9fe638/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalOutputFile.java#L67-L70

The overwriting of the content is not done in an atomical manner. We need to rethink the implementation of createOrOverwrite() method from LocalOutputFile used to interact in tests with the local file storage.

cc @electrum

ebyhr commented 1 month ago

https://github.com/trinodb/trino/actions/runs/8995057281/job/24709451666

electrum commented 1 month ago

We could change this to write to a temporary file and then rename. We could name the temporary file .tmp.$RANDOM.$ORIGINAL

ebyhr commented 3 weeks ago

Reoccured on master https://github.com/trinodb/trino/actions/runs/9357870915/job/25758647303

@findinpath Do you have any updates?