trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.43k stars 3k forks source link

Delta: Internal Active files cache is not being cleared when table is dropped and recreated externally #13737

Open findinpath opened 2 years ago

findinpath commented 2 years ago

Scenario:

// Create table either in Spark or Trino // Select contents of the table in Trino (fill active files cache) // Drop table via Spark (and remove, if table is external) the content of the table // Create table via Spark // Select contents of the table in Trino

Stacktrace:

Suppressed: java.lang.Exception: Query: SELECT * FROM test_dl_cached_table_files_refres_134tdx8hs9wh
tests               |       at io.trino.tempto.query.JdbcQueryExecutor.executeQueryNoParams(JdbcQueryExecutor.java:136)
tests               |       at io.trino.tempto.query.JdbcQueryExecutor.execute(JdbcQueryExecutor.java:112)
tests               |       at io.trino.tempto.query.JdbcQueryExecutor.executeQuery(JdbcQueryExecutor.java:84)
tests               |       at io.trino.tests.product.utils.QueryExecutors$1.lambda$executeQuery$0(QueryExecutors.java:60)
tests               |       at net.jodah.failsafe.Functions.lambda$get$0(Functions.java:48)
tests               |       at net.jodah.failsafe.RetryPolicyExecutor.lambda$supply$0(RetryPolicyExecutor.java:62)
tests               |       at net.jodah.failsafe.Execution.executeSync(Execution.java:129)
tests               |       at net.jodah.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:376)
tests               |       at net.jodah.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:67)
tests               |       at io.trino.tests.product.utils.QueryExecutors$1.executeQuery(QueryExecutors.java:60)
tests               |       at io.trino.tests.product.deltalake.TestDeltaLakeActiveFilesCache.testSelectFromTrinoShouldRefreshTheFilesCacheWhenTableIsRecreated(TestDeltaLakeActiveFilesCache.java:79)
tests               |       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
tests               |       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
tests               |       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
tests               |       at java.base/java.lang.reflect.Method.invoke(Method.java:568)
tests               |       at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
tests               |       at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
tests               |       at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
tests               |       at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
tests               |       at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
tests               |       at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
tests               |       ... 3 more
tests               | Caused by: io.trino.spi.TrinoException: Error reading tail from s3://presto-ci-test/databricks-compatibility-test-test_dl_cached_table_files_refres_134tdx8hs9wh/20220818_191218_00001_8gf3d-b46196c7-491b-4472-8b7e-306590072d32 with length 202
tests               |   at io.trino.plugin.hive.parquet.HdfsParquetDataSource.readTailInternal(HdfsParquetDataSource.java:67)
tests               |   at io.trino.parquet.AbstractParquetDataSource.readTail(AbstractParquetDataSource.java:90)
tests               |   at io.trino.parquet.reader.MetadataReader.readFooter(MetadataReader.java:94)
tests               |   at io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createPageSource(ParquetPageSourceFactory.java:213)
tests               |   at io.trino.plugin.deltalake.DeltaLakePageSourceProvider.createPageSource(DeltaLakePageSourceProvider.java:170)
tests               |   at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:49)
tests               |   at io.trino.split.PageSourceManager.createPageSource(PageSourceManager.java:62)
tests               |   at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:308)
tests               |   at io.trino.operator.Driver.processInternal(Driver.java:410)
tests               |   at io.trino.operator.Driver.lambda$process$10(Driver.java:313)
tests               |   at io.trino.operator.Driver.tryWithLock(Driver.java:703)
tests               |   at io.trino.operator.Driver.process(Driver.java:305)
tests               |   at io.trino.operator.Driver.processForDuration(Driver.java:276)
tests               |   at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:737)
tests               |   at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:164)
tests               |   at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:490)
tests               |   at io.trino.$gen.Trino_393_5_g41d9749____20220818_191146_2.run(Unknown Source)
tests               |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
tests               |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
tests               |   at java.base/java.lang.Thread.run(Thread.java:833)

Cause: the table's active files cache in https://github.com/trinodb/trino/blob/6310e5e1415fe0bc89444016fc516640c3a3fcbb/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java is not invalidated in case that the table is dropped and recreated externally which will cause the SELECT statement to look for files that don't exist anymore.

findinpath commented 2 years ago
/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.BeforeTestWithContext;
import org.testng.annotations.Test;

import java.util.List;

import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;

public class TestDeltaLakeActiveFilesCache
        extends BaseTestDeltaLakeS3Storage
{
    @Inject
    @Named("s3.server_type")
    private String s3ServerType;

    private AmazonS3 s3;

    @BeforeTestWithContext
    public void setup()
    {
        super.setUp();
        s3 = new S3ClientFactory().createS3Client(s3ServerType);
    }

    @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
    public void testSelectFromTrinoShouldRefreshTheFilesCacheWhenTableIsRecreated()
    {
        String tableName = "test_dl_cached_table_files_refres_" + randomTableSuffix();
        String tableDirectory = "databricks-compatibility-test-" + tableName;

        onTrino().executeQuery(format("CREATE TABLE delta.default.%s (col INT) WITH (location = 's3://%s/%s')",
                tableName,
                bucketName,
                tableDirectory));

        onTrino().executeQuery("INSERT INTO " + tableName + " VALUES 1");
        // Add the files of the table in the active files cache
        assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1));

        // Recreate the table outside of Trino to avoid updating the Trino table active files cache
        onDelta().executeQuery("DROP TABLE default." + tableName);
        // Delete the contents of the table explicitly from storage (because it has been created as `EXTERNAL`)
        deleteDirectory(tableDirectory);

        onDelta().executeQuery(format("CREATE TABLE default.%s (col INTEGER) USING DELTA LOCATION 's3://%s/%s'",
                tableName,
                bucketName,
                tableDirectory));
        onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES 2");

        assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(2));

        onTrino().executeQuery("DROP TABLE " + tableName);
    }

    private void deleteDirectory(String tableDirectory)
    {
        ObjectListing objectList = s3.listObjects(this.bucketName, tableDirectory);
        List<S3ObjectSummary> objectSummeryList = objectList.getObjectSummaries();
        String[] keysList = new String[objectSummeryList.size()];
        int count = 0;
        for (S3ObjectSummary summery : objectSummeryList) {
            keysList[count++] = summery.getKey();
        }
        DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(keysList);
        s3.deleteObjects(deleteObjectsRequest);
    }
}
findinpath commented 2 years ago

We should probably introduce a more complex key in the caches used in TransactionLogAccess

https://github.com/trinodb/trino/blob/6310e5e1415fe0bc89444016fc516640c3a3fcbb/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java#L94-L95

cc: @alexjo2144

findinpath commented 2 years ago

Workaround:

Waiting for the time denoted by the setting delta.metadata.live-files.cache-ttl (~ 30 minutes by default) which will clear the file cache entry for the table should help dealing with this problem

homar commented 2 years ago

I am pretty sure several months ago we had very similar issue and we decided that recreating the table in the same place is not a valid behaviour and that we don't want to do this

findinpath commented 2 years ago

Suggestion from @findepi : use table id in the key of the cached files

findepi commented 2 years ago

use table id in the key of the cached files

Except that I don't know how to re-validate the table id without re-reading the table metadata from disk.