prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
15.92k stars 5.33k forks source link

Reading Delta Lake table from S3 is slow if _last_checkpoint does not exist #18898

Open dnskr opened 1 year ago

dnskr commented 1 year ago

There is a performance issue when I read newly created Delta Lake table stored on S3. Each delta table access requires more than 15 seconds to be done.

The issue is caused by loadMetadataFromFile(...) method in Delta Lake Connectors project. The method expects FileNotFoundException if _last_checkpoint file is missing, which is true for tables with less then 10 updates. See Checkpoints.scala#L133

I created the issue https://github.com/delta-io/connectors/issues/437 in Delta Lake Connectors repo where it was suggested to fix it in Presto by throwing FileNotFoundException if file does not exist on S3.

I created the PR https://github.com/prestodb/presto/pull/18307 which was merged, but unfortunately it has not solved the issue. Obviously I missed something needed to be changed, i.e. the PR is not complete.

Error log before the PR:

2022-09-07T13:51:16.040Z        WARN    Query-20220907_135115_00006_up6te-593   io.delta.standalone.internal.DeltaLogImpl       Failed to parse s3a://data-warehouse/my_database/my_table/_delta_log/_last_checkpoint. This may happen if there was an error during read operation, or a file appears to be partial. Sleeping and trying again.
com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: 1712984EA7667136; S3 Extended Request ID: 0aa37532-b31a-403d-9632-343d5975c4e9), S3 Extended Request ID: 0aa37532-b31a-403d-9632-343d5975c4e9 (Path: s3a://data-warehouse/my_database/my_table/_delta_log/_last_checkpoint)
        at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1114)
        at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
        at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1099)
        ...

Error log after the PR:

2023-01-04T16:09:56.528Z        WARN    20230104_160933_13352_s8vjw.2.0.0-0-105 io.delta.standalone.internal.DeltaLogImpl       Failed to parse s3a://data-warehouse/my_database/my_table/_delta_log/_last_checkpoint. This may happen if there was an error during read operation, or a file appears to be partial. Sleeping and trying again.
java.io.UncheckedIOException: java.io.FileNotFoundException: File does not exist: s3a://data-warehouse/my_database/my_table/_delta_log/_last_checkpoint
        at io.delta.storage.LineCloseableIterator.hasNext(LineCloseableIterator.java:72)
        at io.delta.storage.LineCloseableIterator.next(LineCloseableIterator.java:78)
        at io.delta.storage.LineCloseableIterator.next(LineCloseableIterator.java:29)
        ...
2023-01-04T16:09:57.532Z        WARN    20230104_160933_13352_s8vjw.2.0.0-0-105 io.delta.standalone.internal.DeltaLogImpl       s3a://data-warehouse/my_database/my_table/_delta_log/_last_checkpoint is corrupted. Will search the checkpoint files directly
java.io.UncheckedIOException: java.io.FileNotFoundException: File does not exist: s3a://data-warehouse/my_database/my_table/_delta_log/_last_checkpoint
        at io.delta.storage.LineCloseableIterator.hasNext(LineCloseableIterator.java:72)
        at io.delta.storage.LineCloseableIterator.next(LineCloseableIterator.java:78)
        at io.delta.storage.LineCloseableIterator.next(LineCloseableIterator.java:29)
        ...
dnskr commented 1 year ago

@vkorukanti and @agrawalreetika I would highly appreciate it if you could have a look into the issue and help to resolve it.

agrawalreetika commented 1 year ago

Hi @dnskr, Sorry for the delay in response. I tried to debug this, I see that your changes should work fine till Presto is using Delta Version delta-standalone_2.12:0.3.0 since with this https://github.com/delta-io/connectors/blob/v0.3.0/standalone/src/main/java/io/delta/standalone/data/CloseableIterator.java is an interface which is being extended from https://github.com/delta-io/connectors/blob/v0.3.0/standalone/src/main/scala/io/delta/standalone/internal/storage/LineCloseableIterator.scala#L40 and here hasNext() is not throwing any extra exception.

But in case of any upgraded version of Delta in Presto like from delta-standalone_2.12:0.5.0 this https://github.com/delta-io/delta/blob/v2.0.0/storage/src/main/java/io/delta/storage/LineCloseableIterator.java#L71 is getting implemented in io.delta:delta-storage:2.0.0 where hasNext() is wrapping FileNotFoundException with UncheckedIOException. Which is why the Exception is not matching with FileNotFoundException here - https://github.com/delta-io/connectors/blob/v0.5.0/standalone/src/main/scala/io/delta/standalone/internal/Checkpoints.scala#L141

I tried checking on Presto side if we get this java.io.UncheckedIOException: java.io.FileNotFoundException to catch in PrestoS3FileSystem but I didn't fine that wrapped exception coming here.

@vkorukanti can correct if I am missing something here and suggest if it is something we can still handle from Presto side.

vikram-naik commented 1 year ago

I'm using below setup and 'm getting above exceptions

I'm able to query the data via presto-cli, however the presto server logs below errors and delays the output.

2023-04-08T14:50:13.878+0530 WARN Query-20230408_092001_00006_c3g7f-192 io.delta.standalone.internal.DeltaLogImpl Failed to parse s3a://delta-lake/delta-table/_delta_log/_last_checkpoint. This may happen if there was an error during read operation, or a file appears to be partial. Sleeping and trying again. java.io.UncheckedIOException: java.io.FileNotFoundException: File does not exist: s3a://delta-lake/delta-table/_delta_log/_last_checkpoint at io.delta.storage.LineCloseableIterator.hasNext(LineCloseableIterator.java:72) at io.delta.storage.LineCloseableIterator.next(LineCloseableIterator.java:78) at io.delta.storage.LineCloseableIterator.next(LineCloseableIterator.java:29) at io.delta.standalone.internal.Checkpoints.loadMetadataFromFile(Checkpoints.scala:138) at io.delta.standalone.internal.Checkpoints.lastCheckpoint(Checkpoints.scala:110) at io.delta.standalone.internal.Checkpoints.lastCheckpoint$(Checkpoints.scala:109) at io.delta.standalone.internal.DeltaLogImpl.lastCheckpoint(DeltaLogImpl.scala:42) at io.delta.standalone.internal.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:218) at io.delta.standalone.internal.SnapshotManagement.$init$(SnapshotManagement.scala:37) at io.delta.standalone.internal.DeltaLogImpl.<init>(DeltaLogImpl.scala:47) at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:263) at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:245) at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala) at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:176) at com.facebook.presto.delta.DeltaClient.loadDeltaTableLog(DeltaClient.java:151) at com.facebook.presto.delta.DeltaClient.listFiles(DeltaClient.java:122) at com.facebook.presto.delta.DeltaSplitManager$DeltaSplitSource.<init>(DeltaSplitManager.java:80) at com.facebook.presto.delta.DeltaSplitManager.getSplits(DeltaSplitManager.java:66) at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager.getSplits(ClassLoaderSafeConnectorSplitManager.java:41) at com.facebook.presto.split.SplitManager.getSplits(SplitManager.java:89) at com.facebook.presto.split.CloseableSplitSourceProvider.getSplits(CloseableSplitSourceProvider.java:52) at com.facebook.presto.sql.planner.SplitSourceFactory$Visitor.lambda$visitTableScan$0(SplitSourceFactory.java:157) at com.facebook.presto.sql.planner.LazySplitSource.getDelegate(LazySplitSource.java:96) at com.facebook.presto.sql.planner.LazySplitSource.getConnectorId(LazySplitSource.java:48) at com.facebook.presto.execution.scheduler.SectionExecutionFactory.createStageScheduler(SectionExecutionFactory.java:281) at com.facebook.presto.execution.scheduler.SectionExecutionFactory.createStreamingLinkedStageExecutions(SectionExecutionFactory.java:243) at com.facebook.presto.execution.scheduler.SectionExecutionFactory.createStreamingLinkedStageExecutions(SectionExecutionFactory.java:221) at com.facebook.presto.execution.scheduler.SectionExecutionFactory.createSectionExecutions(SectionExecutionFactory.java:167) at com.facebook.presto.execution.scheduler.LegacySqlQueryScheduler.createStageExecutions(LegacySqlQueryScheduler.java:354) at com.facebook.presto.execution.scheduler.LegacySqlQueryScheduler.<init>(LegacySqlQueryScheduler.java:243) at com.facebook.presto.execution.scheduler.LegacySqlQueryScheduler.createSqlQueryScheduler(LegacySqlQueryScheduler.java:172) at com.facebook.presto.execution.SqlQueryExecution.planDistribution(SqlQueryExecution.java:611) at com.facebook.presto.execution.SqlQueryExecution.start(SqlQueryExecution.java:459) at com.facebook.presto.$gen.Presto_0_280_a95c1b4____20230408_091743_1.run(Unknown Source) at com.facebook.presto.execution.SqlQueryManager.createQuery(SqlQueryManager.java:306) at com.facebook.presto.dispatcher.LocalDispatchQuery.lambda$startExecution$8(LocalDispatchQuery.java:211) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)