trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (
Apache License 2.0
10.38k stars 2.98k forks source link

Cannot read Delta Lake table with a checkpoint created by the python deltalake package #18760

Open rgelsi opened 1 year ago

rgelsi commented 1 year ago

The checkpoint created by the Python Delta Lake package cannot be read by Trino (Version 424).

Saw that the checkpoint parquet file created by the Python package has a different scheme, respectively the fields are arranged differently than in the checkpoint created by PySpark . The content of the line containing the metadata is identical.

io.trino.spi.TrinoException: Error opening Hive split s3a://testbucket/ (offset=0, length=35109): null value in entry: [metadata, name]=null
    at io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createPageSource(
    at io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.<init>(
    at io.trino.plugin.deltalake.transactionlog.TableSnapshot.getCheckpointTransactionLogEntries(
    at io.trino.plugin.deltalake.transactionlog.TableSnapshot.getCheckpointTransactionLogEntries(
    at io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.getEntries(
    at io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.getEntries(
    at io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.getMetadataEntry(
    at io.trino.plugin.deltalake.DeltaLakeMetadata.getTableHandle(
    at io.trino.plugin.deltalake.DeltaLakeMetadata.getTableHandle(
    at io.trino.spi.connector.ConnectorMetadata.getTableHandle(
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getTableHandle(
    at io.trino.tracing.TracingConnectorMetadata.getTableHandle(
    at io.trino.metadata.MetadataManager.lambda$getTableHandle$5(
    at java.base/java.util.Optional.flatMap(
    at io.trino.metadata.MetadataManager.getTableHandle(
    at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(
    at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(
    at io.trino.tracing.TracingMetadata.getRedirectionAwareTableHandle(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.getTableHandle(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(
    at io.trino.sql.tree.Table.accept(
    at io.trino.sql.tree.AstVisitor.process(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.analyzeFrom(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(
    at io.trino.sql.tree.QuerySpecification.accept(
    at io.trino.sql.tree.AstVisitor.process(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(
    at io.trino.sql.tree.Query.accept(
    at io.trino.sql.tree.AstVisitor.process(
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(
    at io.trino.sql.analyzer.Analyzer.analyze(
    at io.trino.sql.analyzer.Analyzer.analyze(
    at io.trino.execution.SqlQueryExecution.analyze(
    at io.trino.execution.SqlQueryExecution.<init>(
    at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(
    at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(
    at io.trino.$ Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
    at java.base/java.util.concurrent.ThreadPoolExecutor$
    at java.base/
Caused by: java.lang.NullPointerException: null value in entry: [metadata, name]=null
    at java.base/$3ReducingSink.accept(
    at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(
    at java.base/
    at java.base/
    at java.base/$ReduceOp.evaluateSequential(
    at java.base/
    at java.base/
    at io.trino.parquet.reader.TrinoColumnIndexStore.loadIndexes(
    at io.trino.parquet.reader.TrinoColumnIndexStore.getOffsetIndex(
    at org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.applyPredicate(
    at org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.visit(
    at org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.visit(
    at org.apache.parquet.filter2.predicate.Operators$UserDefined.accept(
    at org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter$1.visit(
    at org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter$1.visit(
    at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(
    at org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.calculateRowRanges(
    at io.trino.parquet.reader.ParquetReader.calculateFilteredRowRanges(
    at io.trino.parquet.reader.ParquetReader.<init>(
    at io.trino.plugin.hive.parquet.ParquetPageSourceFactory.lambda$createPageSource$2(
    at io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createParquetPageSource(
    at io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createPageSource(
    ... 50 more

Schema of checkpoint parquet file created by the Python Delta Lake package:

 |-- metaData: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- schemaString: string (nullable = true)
 |    |-- createdTime: long (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- configuration: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |    |-- options: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: integer (nullable = true)
 |    |-- minWriterVersion: integer (nullable = true)
 |-- txn: struct (nullable = true)
 |    |-- appId: string (nullable = true)
 |    |-- version: long (nullable = true)
 |-- add: struct (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- size: long (nullable = true)
 |    |-- modificationTime: long (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- stats: string (nullable = true)
 |    |-- partitionValues: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- tags: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- stats_parsed: struct (nullable = true)
 |    |    |-- numRecords: long (nullable = true)
 |    |    |-- minValues: struct (nullable = true)
 |    |    |    |-- task_fk: string (nullable = true)
 |    |    |    |-- erp: long (nullable = true)
 |    |    |    |-- gqs: long (nullable = true)
 |    |    |    |-- timestamp: timestamp_ntz (nullable = true)
 |    |    |    |-- site: string (nullable = true)
 |    |    |-- maxValues: struct (nullable = true)
 |    |    |    |-- task_fk: string (nullable = true)
 |    |    |    |-- erp: long (nullable = true)
 |    |    |    |-- gqs: long (nullable = true)
 |    |    |    |-- timestamp: timestamp_ntz (nullable = true)
 |    |    |    |-- site: string (nullable = true)
 |    |    |-- nullCount: struct (nullable = true)
 |    |    |    |-- task_fk: long (nullable = true)
 |    |    |    |-- erp: long (nullable = true)
 |    |    |    |-- gqs: long (nullable = true)
 |    |    |    |-- timestamp: long (nullable = true)
 |    |    |    |-- site: long (nullable = true)
 |-- remove: struct (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- deletionTimestamp: long (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- extendedFileMetadata: boolean (nullable = true)
|metaData                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |protocol|txn |add |remove|
|{841e8772-7925-41e6-8998-56edf8f36c35, null, null, {"type":"struct","fields":[{"name":"col1","type":"string","nullable":true,"metadata":{}},{"name":"col2","type":"long","nullable":true,"metadata":{}},{"name":"col3","type":"long","nullable":true,"metadata":{}},{"name":"col4","type":"timestamp","nullable":true,"metadata":{}},{"name":"col5","type":"string","nullable":true,"metadata":{}}]}, 1689033614464, [], {delta.logRetentionDuration -> interval 7 days}, {parquet, {}}}      |null    |null|null|null  |

Schema of checkpoint parquet file created by PySpark:

 |-- txn: struct (nullable = true)
 |    |-- appId: string (nullable = true)
 |    |-- version: long (nullable = true)
 |    |-- lastUpdated: long (nullable = true)
 |-- add: struct (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- partitionValues: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- size: long (nullable = true)
 |    |-- modificationTime: long (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- tags: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- deletionVector: struct (nullable = true)
 |    |    |-- storageType: string (nullable = true)
 |    |    |-- pathOrInlineDv: string (nullable = true)
 |    |    |-- offset: integer (nullable = true)
 |    |    |-- sizeInBytes: integer (nullable = true)
 |    |    |-- cardinality: long (nullable = true)
 |    |    |-- maxRowIndex: long (nullable = true)
 |    |-- stats: string (nullable = true)
 |-- remove: struct (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- deletionTimestamp: long (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- extendedFileMetadata: boolean (nullable = true)
 |    |-- partitionValues: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- size: long (nullable = true)
 |    |-- deletionVector: struct (nullable = true)
 |    |    |-- storageType: string (nullable = true)
 |    |    |-- pathOrInlineDv: string (nullable = true)
 |    |    |-- offset: integer (nullable = true)
 |    |    |-- sizeInBytes: integer (nullable = true)
 |    |    |-- cardinality: long (nullable = true)
 |    |    |-- maxRowIndex: long (nullable = true)
 |-- metaData: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |    |-- options: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |-- schemaString: string (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- configuration: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |    |-- createdTime: long (nullable = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: integer (nullable = true)
 |    |-- minWriterVersion: integer (nullable = true)
 |    |-- readerFeatures: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- writerFeatures: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
|txn |add |remove|metaData                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |protocol|
|null|null|null  |{841e8772-7925-41e6-8998-56edf8f36c35, null, null, {parquet, {}}, {"type":"struct","fields":[{"name":"col1","type":"string","nullable":true,"metadata":{}},{"name":"col2","type":"long","nullable":true,"metadata":{}},{"name":"col3","type":"long","nullable":true,"metadata":{}},{"name":"col4","type":"timestamp","nullable":true,"metadata":{}},{"name":"col5","type":"string","nullable":true,"metadata":{}}]}, [], {delta.logRetentionDuration -> interval 7 days}, 1689033614464}      |null    |
ebyhr commented 1 year ago

Can you attache the archived directory instead of text so that we can reproduce the issue easily? Also, is it possible to reproduce with Spark SQL?

rgelsi commented 1 year ago

Code to reproduce:

from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import pandas as pd

path = "/path/to/"

data = {
    'col1': ['a', 'b'],
    'col2': [1, 2]
df = pd.DataFrame.from_dict(data)
write_deltalake(path, df)

dt = DeltaTable(path)

I can read the table without any problems with Spark SQL.

Also, if I generate a checkpoint with Spark after creating the checkpoint through the Python package, the table can be read again with Trino.

ebyhr commented 1 year ago

Thanks, I could reproduce the issue in my laptop. Looking into the details.

ebyhr commented 1 year ago

Disabling parquet.use-column-index config property and restarting the cluster will help as the workaround. There's delta.parquet_use_column_index session property, but it's not used when reading checkpoint parquet files.