trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.49k stars 3.02k forks source link

Delta Lake: Trino fails to write checkpoint on tables containing `Infinity` #24029

Open Pluies opened 2 weeks ago

Pluies commented 2 weeks ago

Hi folks,

We've recently noticed an issue where the jobs optimizing our Delta Lake tables were silently failing to write checkpoints.

The query appears to complete successfully:

$ ALTER TABLE foo EXECUTE optimize;

ALTER TABLE EXECUTE
 rows
------
(0 rows)

Query 20241025_143716_00027_7d36b, FINISHED, 2 nodes
Splits: 8 total, 8 done (100.00%)
0.22 [0 rows, 0B] [0 rows/s, 0B/s]

But upon looking at the Delta Table, no new checkpoint was created; eventually leading to very long analysis times (after a few thousand log entries piled up without a checkpoint 😬 ).

Looking into it, we noticed the following error log in the coordinator:

2024-11-04T12:39:37.774Z    ERROR   20241104_123936_00273_m45u9.0.0.0-16-159    io.trino.plugin.deltalake.DeltaLakeMetadata Failed to write checkpoint for table foo for version 1234

And the full stacktrace:

java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Double (java.lang.String and java.lang.Double are in module java.base of loader 'bootstrap')
    at io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue(DeltaLakeParquetStatisticsUtils.java:131)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$preprocessMinMaxValues$15(CheckpointWriter.java:434)
    at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
    at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
    at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$preprocessMinMaxValues$16(CheckpointWriter.java:428)
    at java.base/java.util.Optional.map(Optional.java:260)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.preprocessMinMaxValues(CheckpointWriter.java:420)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.writeMinMaxMapAsFields(CheckpointWriter.java:391)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$writeParsedStats$12(CheckpointWriter.java:375)
    at io.trino.spi.block.RowBlockBuilder.buildEntry(RowBlockBuilder.java:111)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.writeParsedStats(CheckpointWriter.java:361)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.lambda$writeAddFileEntry$5(CheckpointWriter.java:279)
    at io.trino.spi.block.RowBlockBuilder.buildEntry(RowBlockBuilder.java:111)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.writeAddFileEntry(CheckpointWriter.java:251)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter.write(CheckpointWriter.java:165)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager.writeCheckpoint(CheckpointWriterManager.java:158)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.writeCheckpointIfNeeded(DeltaLakeMetadata.java:2766)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishOptimize(DeltaLakeMetadata.java:2614)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.finishTableExecute(DeltaLakeMetadata.java:2555)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishTableExecute(ClassLoaderSafeConnectorMetadata.java:239)
    at io.trino.tracing.TracingConnectorMetadata.finishTableExecute(TracingConnectorMetadata.java:177)
    at io.trino.metadata.MetadataManager.finishTableExecute(MetadataManager.java:354)
    at io.trino.tracing.TracingMetadata.finishTableExecute(TracingMetadata.java:227)
    at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4121)
    at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
    at io.trino.operator.Driver.processInternal(Driver.java:403)
    at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
    at io.trino.operator.Driver.tryWithLock(Driver.java:709)
    at io.trino.operator.Driver.process(Driver.java:298)
    at io.trino.operator.Driver.processForDuration(Driver.java:269)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
    at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
    at io.trino.$gen.Trino_testversion____20241104_190209_71.run(Unknown Source)
    at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
    at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
    at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1570)

Expected outcome

Table is optimized and a new checkpoint is written.

Actual outcome

Silent (from the user's point of view) failure: checkpoint fails to write, query completes successfully.

Reproduction steps

Assuming a delta lake catalog called delta and a schema schema:

CREATE OR REPLACE TABLE delta.schema.test_optimize (id int, foo double);
INSERT INTO delta.schema.test_optimize VALUES (1, cast('Infinity' as double));
ALTER TABLE delta.schema.test_optimize EXECUTE optimize;

Confirmed with both S3 and Minio as backends.

Detected in Trino version 453 (with a few patches) and confirmed on Trino 463 vanilla from upstream.