trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.25k stars 2.95k forks source link

Delta Lake: Improve coordinator memory consumption for large tables #13198

Closed alexjo2144 closed 10 months ago

alexjo2144 commented 2 years ago

Summing up some conversations about the Delta Lake connector with @findepi, @ebyhr, @findinpath, and @homar

The Delta Lake connector requires a large block of memory during planning time to read the entire Transaction Log into memory. Important pieces of the log are cached in TransactionLogAccess's tableSnapshots and activeDataFileCache caches.

The table snapshots cache is fairly small. It remembers the location of the last Parquet checkpoint, and the entries written in the following json files. It is used to answer metadata level questions about the table without needing to reload files from the fs, getTableHandle, streamTableColumns, etc.

The activeDataFileCache keeps track of the path for each underlying data files. This was originally useful in a bunch of places, but most have been removed. Now it is only used for statistics collection and split generation.

Proposal 1: Iteratively process the parquet checkpoing file:

Relies on https://github.com/trinodb/trino/pull/12196 / https://github.com/trinodb/trino/pull/13047

The general idea here is iteratively build stats from the Transaction Log files on the file system each time getTableStatistics is called, and fetch the files again once for Split generation. This would reduce the amount of data stored in memory at the cost of fetching files from the file system several times during planning. Loading the files during Split generation is probably not a huge hit, since the query is already running at the same time.

This is similar to how Iceberg works now when you call planTasks/planFiles

Pros - Lower memory footprint on the coordinator Cons - Requires going to the file system multiple times per query

Proposal 1a: Thread level Parallelism for reads from the Parquet checkpoint

To try to mitigate the fact that we're reading the files from disk rather than memory we could compute Statistics in a more parallel way. HiveMetastoreBackedDeltaLakeMetastore#getTableStatistics could use a thread pool to accumulate stats for individual sections of the Parquet checkpoint in parallel and then merge them to form table stats.

Proposal 2: Accumulate min/max stats during ANALYZE

We have extra NDV stats in a sidecar file already, we could use the same approach to store min/max/null count stats for each partition during ANALYZE and avoid needing to go through the whole TL to calculate stats when a table has been analyzed.

Pros - Removes need to go to the transaction log for getTableStatistics Cons - Requires running ANALYZE periodically or stats will get stale

Proposal 3: Move checkpoint reading to Workers

Rather than distributing individual files the coordinator could assign workers sections of the Transaction Log to process and it would be the worker's job to get the list of active files from that section of the log. This does not fix the issue of needing min/max stats, but combined with p2 this could avoid needing to open the transaction log on the coordinator at all.

Pros - Less work on the coordinator to do split generation Cons - Coordinator/scheduler gets less insight into how much the workers are actually reading. This makes Weights hard to calculate

kietay commented 1 year ago

@alexjo2144 Is this a change I could work on as a first time committer? I use Delta Lake extensively at my primary job very familiar with the internals.

alexjo2144 commented 1 year ago

I think it would be good to have consensus on how we should address the problem before writing much code. I'd take a look at the "Good first Issue" label.

hashhar commented 1 year ago

Alternatively if you have some specific ideas on how to improve it it might be useful to create an issue or add a comment detailing what you propose to change and then we can arrive on consensus and move forward.

kietay commented 1 year ago

@alexjo2144 do we know where the memory pressure is happening specifically?

Also wondering if there has been any comparison done with how Spark handles loading the log into the driver and if they see similar pressure. Anecdotally I have not noticed any similar issues on the Spark side & IIUC the order of operations for executing a query is similar to that of Trino.

Having read through the Delta plugin it doesn't seem like 3 is a viable option as if there is any skew in the parquet files it could huge a large impact on the skew during split generation.

findepi commented 1 year ago

@alexjo2144 do we know where the memory pressure is happening specifically?

see this in the issue description

Important pieces of the log are cached in TransactionLogAccess's tableSnapshots and activeDataFileCache caches.

raunaqmorarka commented 10 months ago

Can we consider this resolved by the recent delta.checkpoint-filtering.enabled changes @findepi @findinpath @ebyhr ?