Closed arunravimv closed 3 months ago
@rajeshparangi can you take a look?
I will be out of office for two weeks. I will be able to take a look once I come back.
I worry that the design for VACUUM with partition filters is incompatible with the Delta Lake protocol spec. The problem is that Delta Lake does not require the files to be distributed according to the traditional Hive style partitioning. So in theory you might have a file in directory date=2023-09-26
that is actually for partition date=2023-09-25
according to the Delta log (which is the source of truth). This design seems to require that partitions actually follow that naming scheme. Because otherwise you might delete files from a directory date=X
that are actually referenced by the table but in another partition with date Y
. There are also implementations that don't follow the hive naming scheme at all.
That said, the parallelization of file listing is a good idea! If we can separate that out, then it can be considered separately. Does that make sense?
@bart-samwel thank you for your comments. I understand that the delta tables need not always follow hive-style partitioning. So far my understanding was this would happen in the following cases and I believe that it is handled in the code.
It would be useful if you could point me to other cases where
in theory you might have a file in directory date=2023-09-26 that is actually for partition date=2023-09-25 according to the Delta log (which is the source of truth)
Finally to answer you about the deletion of files from another directory (>you might have a file in directory date=2023-09-26 that is actually for partition date=2023-09-25 according to the Delta log ). The source of truth for all valid files are delta log and there are no filters applied while getting this dataframe. The filter is only applied to scan for all files in the table. So the subsequent left-anti
join all files
data frame with valid files
data frame would protect from deletion of any file referenced in delta log (the source of truth)
That said, the parallelization of file listing is a good idea! If we can separate that out, then it can be considered separately. Does that make sense?
@arunravimv
@bart-samwel thank you for your comments. I understand that the delta tables need not always follow hive-style partitioning. So far my understanding was this would happen in the following cases and I believe that it is handled in the code.
- Random Prefix Tables: This was not available in OSS delta and was ignored
- When Partition columns are renamed: The above logic gets the logical to physical folder name mapping from delta log and I believe it is handled.
- When Tables are liquid partitioned: The liquid partitioned tables don't have partition column spec in delta log and hence would not be able to use this feature and has graceful exit handled in the code.
It would be useful if you could point me to other cases where physical partitions like hive style will be missing
I think you've probably covered the current cases pretty well. The problem is that these are current cases, but we need to consider all cases, including cases from the future that we don't know about yet but that do produce valid Delta tables.
in theory you might have a file in directory date=2023-09-26 that is actually for partition date=2023-09-25 according to the Delta log (which is the source of truth)
Finally to answer you about the deletion of files from another directory (>you might have a file in directory date=2023-09-26 that is actually for partition date=2023-09-25 according to the Delta log ). The source of truth for all valid files are delta log and there are no filters applied while getting this dataframe. The filter is only applied to scan for all files in the table. So the subsequent
left-anti
joinall files
data frame withvalid files
data frame would protect from deletion of any file referenced in delta log (the source of truth)
So it would simply limit the deletions to the set of files in this folder, but it would consider the entire log, including other partitions? That sounds correct!
I think the trouble is that if we add this new VACUUM
syntax, it looks like logical syntax that operates on the logical partitions of the table. As a user, I would expect this to work on the logical partitions, and I'd expect it to always work, and not only if the table happens to be written by particular clients with particular settings.
That said, the parallelization of file listing is a good idea! If we can separate that out, then it can be considered separately. Does that make sense?
- If there are still concerns about the filtering and improving the parallel listing could address or improve the performance however it won't address the long-term API cost for maintaining the delta lake.
- Do you think my alternate proposal (in the design doc attached) for using cloud storage inventory to avoid listing of files would be acceptable to get around the API cost if we always have to do full listing?
FWIW I did the math on this. LIST costs $0.005 per 1000 requests. Each request returns 1000 objects. For a bucket with 100M objects, this costs $0.005 / 1000 * (100M / 1000) = $0.50. The equivalent Inventory costs $0.0025 per 1M objects listed, so it costs $0.25. So the difference is $0.25 per day. For a daily vacuum, I don't think that that price difference is worth this type of investment?
I think it's not about API cost. If you're running this from within the cloud, then the VM cost of doing the LISTing is probably the more expensive part. If you use Inventory, then you can start VACUUMing much sooner because you don't have to wait for the LISTing to return. Could we get LIST to be fast enough so that we don't need this extra complexity? Because inventory is hard to use. At standard S3 throttling rates (3500 requests per second), LIST for a 100M objects bucket will take at least 100M / 1000 / 3500 = 28 seconds and $0.25. That sounds entirely acceptable for such a huge table if you run this once a day. But if you do this serially, then it's going to take a lot of time indeed. :)
I think we can come up with an algorithm that lists the entire key space in parallel without knowing anything about partitions or how the files are distributed. How about something that divides and conquers the LISTing space?
E.g.:
Keep a data structure that lists all the ranges that we divided the file name space into, and how far along we are with listing them. I.e., fields for startKey
, endKey
, lastSeenKey
.
[(startKey = "", endKey = None, lastListedKey = None)]
where "" sorts before everything and None means the end key that sorts after everything.lastListedKey
(with a limit -- up to 1000 keys, one LIST request) and collect all the files you see that are before endKey
. If you hit a value past the endKey
(or the actual end of the directory), the range is completely listed. Remove it from the list.lastListedKey
and endKey
in half. (See below for how.) Then split the range into two new ranges at that key. Replace the range in the data structure by the two new ranges.You can do steps 2-3 in parallel for many ranges.
So how do you cut the ranges in half? That's the trickiest bit.
E.g. if you have lastListedKey = "ab-def", endKey = "ab-h"
, then you first have to cut off the common prefix "ab-". The next characters are "d" and "h". You can then average the character codes, so d, e, f, g, h
-> f
is in the middle. So then the middle key is "ab-f"
.
If you have lastListedKey = "ab-de", endKey = "ab-def"
then the common prefix is "ab-de"
and the splitting is between suffix ""
and "f"
. In that case it's the middle character between the starting point and f
. Since f
has ASCII code 102, and the first usable code is 32, that would be something like ASCII 66 (capital B
).
If you have lastListedKey = "ab-dee", endKey = ab-def"
, then the common prefix is "ab-de"
and the splitting is between suffix "e"
and "f"
. These characters are adjacent. So instead you have to append a character to the suffix, so "eP"
or something (P
is ASCII 80 which is midway between space (32) and the max regular character (127).
Would that work? It might take some extra LIST calls to figure out which ranges are empty vs which ranges are interesting... We could seed it with some interesting values, like a sample of the partition directories, or implementations that use random prefixes could seed them with some of the random prefixes.
Here's a much more stupid but practical idea: what if we list all the files that are currently in the log, and assume that the to-be-deleted files are distributed similarly? Then we sample 1 in every 100K files, we sort the file paths, and then we cut the space into ranges at that boundary. We start listing in parallel from each of these starting points (and from the start), and for each of them we list until we hit the next key or the end of the directory.
WDYT?
@bart-samwel, thank you for your quick response. I do agree that the above change relies only on currently available features and mostly revolves around tables generated through the spark client. In our situation, we are also kind of not 100% satisfied with the need to keep track of logical/physical partitions when they would be updated, and when they should be cleaned up. The above tests without filtering predicate and just improving the listing parallelism reduce the run time significantly.
The listing of files will be significant for us because at our scale (we are still in the early days of lakehouse adoption), we have 100s if not 1000s of delta tables (and growing) with 100s Millions of objects. We plan to run automated maintenance of tables for our users and Inventory reports can be reused across the runs and across the tables helping save a lot every month. However, the challenges are not all clouds have them nor there are any standard across the clouds. So this could be a cloud-specific feature, what do you think? Or should we improve the parallelism logic first and then restart the conversation around the use of inventory reports?
@arunravimv
The listing of files will be significant for us because at our scale (we are still in the early days of lakehouse adoption), we have 100s if not 1000s of delta tables (and growing) with 100s Millions of objects. We plan to run automated maintenance of tables for our users and Inventory reports can be reused across the runs and across the tables helping save a lot every month. However, the challenges are not all clouds have them nor there are any standard across the clouds. So this could be a cloud-specific feature, what do you think? Or should we improve the parallelism logic first and then restart the conversation around the use of inventory reports?
If you're going for 100 Ms of objects per table and then 100+ Delta tables, then it could indeed add up. I hadn't thought about the idea of reusing inventory reports -- would it work? You still pay per 1M objects listed, and you can only choose between daily and weekly. If you do daily, and you reuse it the next day for VACUUMing the same table, then the files that are supposed to be deleted are definitely in that list -- anything that was added in the last day isn't going to be deleted. Actually, a weekly inventory would do the trick, because the to-be-deleted data is always at least 7 days old (assuming the default 7 day retention threshold).
I don't mind adding a cloud-specific feature, but I don't know what the rest of the community thinks -- I don't know if there's precedent for this kind of thing. Maybe some other folks can chime in here as well...
But I would also definitely love to see some kind of LIST speed improvements, especially if they're not dependent on particular partitions or file layouts! That should be totally not contentious. :)
@arunravimv
Actually, a weekly inventory would do the trick, because the to-be-deleted data is always at least 7 days old (assuming the default 7 day retention threshold).
Wait, but how would you track which files you had already deleted, if you VACUUM daily? You might be billed for a lot of failed duplicate DELETE requests.
@arunravimv Don't we parallelize listing already? Take a look at the recursiveListDirs
implementation. It first does listUsingLogStore
with recursive set to false
so that it gets top level directories and creates a dataset object. And then second iteration of list is done with recursive set to true
. The second part is parallelized through mapPartitions
interface.
@arunravimv Don't we parallelize listing already? Take a look at the
recursiveListDirs
implementation. It first doeslistUsingLogStore
with recursive set tofalse
so that it gets top level directories and creates a dataset object. And then second iteration of list is done with recursive set totrue
. The second part is parallelized throughmapPartitions
interface.
Hi @rajeshparangi , we have cases where top-level partition folder have fewer cardinality eg yyyy/mm/dd/hh pattern causing performance issues for large tables that have just one month retention of data.
@arunravimv
Actually, a weekly inventory would do the trick, because the to-be-deleted data is always at least 7 days old (assuming the default 7 day retention threshold).
Wait, but how would you track which files you had already deleted, if you VACUUM daily? You might be billed for a lot of failed duplicate DELETE requests.
@bart-samwel we prepare an inventory of delta/data lake objects on a daily basis and hence each day's management jobs will be run against the latest snapshot of inventory.
@arunravimv Don't we parallelize listing already? Take a look at the
recursiveListDirs
implementation. It first doeslistUsingLogStore
with recursive set tofalse
so that it gets top level directories and creates a dataset object. And then second iteration of list is done with recursive set totrue
. The second part is parallelized throughmapPartitions
interface.Hi @rajeshparangi , we have cases where top-level partition folder have fewer cardinality eg yyyy/mm/dd/hh pattern causing performance issues for large tables that have just one month retention of data.
I see. In that case, can we continue to collect top-level directories until we reach a threshold (say 200 as that is the parallelism we use) and then do file level listing. This should cover your scenario, right?
@arunravimv Don't we parallelize listing already? Take a look at the
recursiveListDirs
implementation. It first doeslistUsingLogStore
with recursive set tofalse
so that it gets top level directories and creates a dataset object. And then second iteration of list is done with recursive set totrue
. The second part is parallelized throughmapPartitions
interface.Hi @rajeshparangi , we have cases where top-level partition folder have fewer cardinality eg yyyy/mm/dd/hh pattern causing performance issues for large tables that have just one month retention of data.
I see. In that case, can we continue to collect top-level directories until we reach a threshold (say 200 as that is the parallelism we use) and then do file level listing. This should cover your scenario, right?
This could improve the speed/performance. @rajeshparangi How do you suggest we do the collection if it's not partition spec-based (without introducing any skewness)? Also, this does not address the cost concerns which is why we were inclined lately to use inventory reports to get around file listing.
@arunravimv
Actually, a weekly inventory would do the trick, because the to-be-deleted data is always at least 7 days old (assuming the default 7 day retention threshold).
Wait, but how would you track which files you had already deleted, if you VACUUM daily? You might be billed for a lot of failed duplicate DELETE requests.
@bart-samwel we prepare an inventory of delta/data lake objects on a daily basis and hence each day's management jobs will be run against the latest snapshot of inventory.
That should work. I think that it would be good to add this feature!
I also think that we should keep it cloud agnostic. So here's what I would propose:
Would that work?
@bart-samwel yeap, I agree that we should keep the cloud-specific inventory format to list-of-files format (that we expect) conversion outside of the delta codebase (Maybe in Docs) and then provide the list of files as input to the vacuum command. I think instead of giving the list of files directly in the vacuum command (because there may be too many files) line by line, probably we can pass it as a Parquet file or as a subquery data frame. Then we apply safety checks on this data frame to avoid accidental deletion of files (using base path check, hidden file/folder check)
Something like
vacuum from db.table retain X hours using inventory select filename, modification_time from list_of_files
or
vacuum from db.table retain X hours using inventory <path-to-parquet-file>
What do you think? If you agree I will try to create a design doc for review and raise a feature MR.
@bart-samwel yeap, I agree that we should keep the cloud-specific inventory format to list-of-files format (that we expect) conversion outside of the delta codebase (Maybe in Docs) and then provide the list of files as input to the vacuum command. I think instead of giving the list of files directly in the vacuum command (because there may be too many files) line by line, probably we can pass it as a Parquet file or as a subquery data frame. Then we apply safety checks on this data frame to avoid accidental deletion of files (using base path check, hidden file/folder check)
Something like
vacuum from db.table retain X hours using inventory select filename, modification_time from list_of_files
or
vacuum from db.table retain X hours using inventory <path-to-parquet-file>
What do you think? If you agree I will try to create a design doc for review and raise a feature MR.
I like it! Using a dataframe would be the most idiomatic Spark -- then the user can use an arbitrary source. (Or if we do want a file, it should of course be a Delta table, not a Parquet file. :) )
As for syntax, how about:
VACUUM FROM db.table RETAIN x HOURS USING INVENTORY FROM (query)
I'd like the parentheses around the query so that we can still add extra clauses to VACUUM at the end later. We may want to debate the "INVENTORY" keyword if it's too AWS specific.
@bart-samwel , sorry for the delay I have opened a new MR to pass list of files as a delta table or subquery please take a look.
Which Delta project/connector is this regarding?
Description
Currently, users have large tables with daily/hourly partitions for many years, among all these partitions only recent ones are subjected to change due to job reruns, corrections, and late arriving events.
When Vacuum is run on these tables, the listing of files is performed on all the partitions and it runs for several hours/days. This duration grows as tables grow and vacuum becomes a major overhead for customers especially when they have hundreds or thousands of such delta tables. File system scan takes the most amount of time in Vacuum operation for large tables, mostly due to the parallelism achievable and API throttling on the object stores.
This change does a parallel listing of partitions directories (not files) using partition column info from the delta log manifest. Then constructs a data frame to apply the filtering clause. The partition predicate filtering is only supported for the latest partition structure and when there is a change in partitioning user should run a normal vacuum first.
The parallel listing of partitions also helps default vacuum as it distributes file listing when there are too many partitioning columns.
Design Doc : https://docs.google.com/document/d/1vRTfMk3bRmCWLa-E4W-UaNOgFo_DyFCcCMVjB1GzrcU/edit?usp=sharing
If this PR resolves an issue be sure to include "Resolves #1691" to correctly link and close the issue upon merge.
How was this patch tested?
Unit Testing (
build/sbt 'testOnly org.apache.spark.sql.delta.DeltaVacuumSuite'
)vacuum db.table where v1=20 and v2>=3
Delta Vacuum filtering clause expects partition predicates and only supports partitioned tables
Predicate references non-partition column
ErrorEnd to End Test using Spark 3.4.1/3.3.1 on S3A FS (30 Executors with 25GB memory and 4 Cores)
Does this PR introduce any user-facing changes?
yes, the MR accepts an optional partition predicate filter from the user (similar to Optimize command).
VACUUM table_name [WHERE partition_predicate] [RETAIN num HOURS] [DRY RUN]
eg:
VACUUM test_db.table where year||month >= '202307' RETAIN 168 HOURS dry run