trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.49k stars 3.02k forks source link

Cached delta log gets corrupted when dropping and recreating delta table with Trino #21451

Closed sdaberdaku closed 1 month ago

sdaberdaku commented 7 months ago

In Trino 444 with Alluxio cache enabled, when dropping and then recreating a delta table, I occasionally get the following error:

io.trino.spi.TrinoException: Error getting snapshot for my_schema.my_table
    at io.trino.plugin.deltalake.DeltaLakeMetadata.getSnapshot(DeltaLakeMetadata.java:485)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.getTableHandle(DeltaLakeMetadata.java:547)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.getTableHandle(DeltaLakeMetadata.java:319)
    at io.trino.spi.connector.ConnectorMetadata.getTableHandle(ConnectorMetadata.java:141)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getTableHandle(ClassLoaderSafeConnectorMetadata.java:1243)
    at io.trino.tracing.TracingConnectorMetadata.getTableHandle(TracingConnectorMetadata.java:149)
    at io.trino.metadata.MetadataManager.lambda$getTableHandle$5(MetadataManager.java:283)
    at java.base/java.util.Optional.flatMap(Optional.java:289)
    at io.trino.metadata.MetadataManager.getTableHandle(MetadataManager.java:277)
    at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1884)
    at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1876)
    at io.trino.tracing.TracingMetadata.getRedirectionAwareTableHandle(TracingMetadata.java:1475)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.getTableHandle(StatementAnalyzer.java:5843)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:2288)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:522)
    at io.trino.sql.tree.Table.accept(Table.java:60)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:541)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.analyzeFrom(StatementAnalyzer.java:4892)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:3085)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:522)
    at io.trino.sql.tree.QuerySpecification.accept(QuerySpecification.java:155)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:541)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:549)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:1566)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:522)
    at io.trino.sql.tree.Query.accept(Query.java:118)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:541)
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:501)
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:490)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:97)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:86)
    at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:285)
    at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:218)
    at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:886)
    at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:153)
    at io.trino.$gen.Trino_444____20240405_162025_2.call(Unknown Source)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.UncheckedIOException: Could not parse JSON
    at io.trino.plugin.base.util.JsonUtils.parseJson(JsonUtils.java:120)
    at io.trino.plugin.base.util.JsonUtils.parseJson(JsonUtils.java:75)
    at io.trino.plugin.deltalake.transactionlog.TransactionLogParser.parseJson(TransactionLogParser.java:145)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson(TransactionLogTail.java:126)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson(TransactionLogTail.java:114)
    at io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail(TransactionLogTail.java:72)
    at io.trino.plugin.deltalake.transactionlog.TableSnapshot.load(TableSnapshot.java:100)
    at io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.lambda$loadSnapshot$1(TransactionLogAccess.java:166)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4938)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3576)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2318)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2191)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2081)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4019)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4933)
    at io.trino.cache.EvictableCache.get(EvictableCache.java:112)
    at io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.loadSnapshot(TransactionLogAccess.java:165)
    at io.trino.plugin.deltalake.DeltaLakeMetadata.getSnapshot(DeltaLakeMetadata.java:478)
    ... 44 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 583])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 584] (through reference chain: io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry["add"]->io.trino.plugin.deltalake.transactionlog.AddFileEntry["tags"])
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402)
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1937)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:572)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:440)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1493)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:348)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:440)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1493)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:348)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
    at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4881)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3035)
    at io.trino.plugin.base.util.JsonUtils.parseJson(JsonUtils.java:115)
    ... 61 more
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 583])
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 584]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:585)
    at com.fasterxml.jackson.core.base.ParserBase._handleEOF(ParserBase.java:535)
    at com.fasterxml.jackson.core.base.ParserBase._eofAsNextChar(ParserBase.java:552)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2491)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:913)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:596)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:449)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)
    at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
    ... 75 more

Unfortunately the error is not easy to replicate, I can drop and recreate the same table without issues multiple times.

The actual delta log files in the object storage are not corrupted, they can be read by Spark and by Trino if the coordinator is restarted.

ebyhr commented 7 months ago

cc: @findinpath @wendigo @raunaqmorarka

sdaberdaku commented 7 months ago

The problem persists also in Trino 445. We have two use cases where this issue emerges: In the first case we overwrite a Delta Table with Spark and Trino cannot read it any more. The issue is resolved if we restart the coordinator. In the second case we overwrite a Delta Table with DBT through Trino itself. Again, by restarting the coordinator the tables become readable.

jojovem commented 3 months ago

Hi all.

I´m having the same issue. Everytime i get these errors i just delete the cache data directly in the disk and it solves my problem.

My stack is: Trino (450) + DeltaLake Connector + AWS Glue

cache configuration for the catalog is pretty straighforward:

fs.cache.enabled=true
fs.cache.directories=/tmp/trino-cache/catalog
fs.cache.max-sizes=490GB

When i get the snapshot errors now and then, i just do a rm -rf /tmp/trino-cache/catalog from within the coordinator pod and it works again The disk is an EBS volume

raunaqmorarka commented 3 months ago

cc: @jkylling @Pluies

jkylling commented 3 months ago

Use cases where we can no longer assume that the commit and checkpoint files within the Delta log are immutable should likely just disable the cache on the coordinator. Having mutable commit and checkpoint files is not really addressed by the Delta protocol, and makes any concurrent access of Delta tables brittle. So it's better to avoid this, and use a unique locations every time a table is created.

sdaberdaku commented 3 months ago

Hello @jkylling,

I am definitely using the same unique and deterministic location for a given delta table (which simply depends on the delta table name). Maybe I misunderstood your response, could you please elaborate a bit more? Also, is it possible to disable caching only on the coordinator and keep it enabled on workers?

jkylling commented 3 months ago

Hello @jkylling,

I am definitely using the same unique and deterministic location for a given delta table (which simply depends on the delta table name). Maybe I misunderstood your response, could you please elaborate a bit more? Also, is it possible to disable caching only on the coordinator and keep it enabled on workers?

Sorry, I might be confusing this with another issue. Are you able to share a bit more about your environment? Which object store are you using? You mention that this happens when overwriting a table. What operation do you run on Spark or Trino when you do an overwrite? Can it ever happen that the content of a commit at path/to/my/table/_delta_log/00...00.json is different if I check it at different times?

Unfortunately, it looks like disabling the cache on the coordinator only is no longer possible.

sdaberdaku commented 3 months ago

I am running Trino in AWS EKS using EC2 nodes with NVMe support where I am mounting the cache folders (ec2 nodes are provisioned by Karpenter, and the NVMe disks are automatically mounted as RAID0 ephemeral storage).

The object store is S3 and I am using Glue as a Data Catalog.

With Spark I overwite the tables like so:

(df
 .write
 .format("delta")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .save(path))

Regarding the the content of a commit I really have no idea.

jojovem commented 3 months ago

@jkylling It seems that the stack @sdaberdaku is using is pretty similar to mine. In my case, this problem usually happens when i delete the entire s3 bucket (including the _delta_log) to start over with a new table. I also delete the table on aws glue and re-run the catalog crawler.

It seems the cache seems to think the previous created table is still valid, so it seems to keep searching for the snapshot that do not exist anymore. Maybe it should flush the cache when this kind of error happens?

I also tried with: CALL system.flush_metadata_cache();

But it doesnt work, i have to manually delete the cache to make it work again.

jkylling commented 3 months ago

Discussed this issue with @raunaqmorarka and we will add a configuration option to disable caching of files with /_delta_log/ in their path to avoid issues with Delta tables with mutable commits.

That said, having mutable files within /_delta_log/ is an anti-pattern which is likely to bite you in other subtle ways. It's better to avoid it, for instance by adding a UUID to the location of every table, or by deleting a table while preserving its history and data.

sdaberdaku commented 2 months ago

Hello @jkylling, Any news on the configuration option to disable delta log file caching?

Thanks,

Sebastian

jkylling commented 2 months ago

Hi @sdaberdaku,

No one has had time to look into this yet. If you want to contribute this yourself I'd be happy to give some guidance.

Basically, a new https://github.com/trinodb/trino/blob/c8568a9ccfcf2876ef441588a6040270d82f95b2/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java must be added which returns an empty cache key for any file within /_delta_log/. Enabling of this CacheKeyProvider would be controlled by some new configuration option defined within https://github.com/trinodb/trino/blob/c8568a9ccfcf2876ef441588a6040270d82f95b2/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java#L45, and used to select the provider at https://github.com/trinodb/trino/blob/c8568a9ccfcf2876ef441588a6040270d82f95b2/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java#L154

Pluies commented 2 months ago

Alternatively, you can reuse code from closed PR https://github.com/trinodb/trino/pull/21131 and use skip-paths to skip over file paths containing /_delta_log 👍

sdaberdaku commented 2 months ago

Thank you @jkylling and @Pluies for the valuable suggestions! I will look into this and try my best to submit a PR!

sdaberdaku commented 2 months ago

Hello @jkylling,

I implemented your suggestions and submitted a PR (I also submitted the cla). Would you be willing to be a reviewer?

Best,

Sebastian