linkedin / openhouse

Open Control Plane for Tables in Data Lakehouse
https://www.openhousedb.org/
BSD 2-Clause "Simplified" License
273 stars 43 forks source link

Made Retention Operation Lightweight by removing/fixing non-metadata ops #83

Closed rohitkum2506 closed 2 months ago

rohitkum2506 commented 2 months ago

Summary

Deletion Logic in Retention Job had complex predicate to filter rows based on partitionColumns for String partitioned table. This leads to problems with delete ops:

  1. Adding snapshots with overwrite type
  2. Scanning data files as well since predicate pushdown was not happening, evident from Query plans. Filter in BatchScan is empty and line above BatchScan has filter with SQL function.
spark.sql("explain extended select count(*) from `u_pageai`.`pages_representatives_v1` WHERE to_date(substring(datepartition, 0, CHAR_LENGTH('yyyy-MM-dd')), 'yyyy-MM-dd') < date_trunc('DAY', current_timestamp() - INTERVAL 3 DAYs)").show(200, false)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)], output=[count(1)#190L])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#224]
      +- HashAggregate(keys=[], functions=[partial_count(1)], output=[count#193L])
         +- Project
            +- Filter (cast(cast(gettimestamp(substring(datepartition#184, 0, 10), yyyy-MM-dd, Some(UTC), false) as date) as timestamp) < 1711670400000000)
               +- BatchScan openhouse.u_pageai.pages_representatives_v1[datepartition#184] openhouse.u_pageai.pages_representatives_v1 [filters=]

The change:

  1. Updates the delete query to use string comparison which makes delete ops metadata only query
  2. Remove other optimizations in Retention Job which scan datasets preventing Retention delete Ops from being a Pure metadata OPS.

Query Plan with new Query:

|plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('datePartition < cast('date_format(('current_timestamp() - 3 days), yyyy-MM-dd) as string))
   +- 'UnresolvedRelation [openhouse, oh_playground, retentionNewTest], [], false

== Analyzed Logical Plan ==
id: bigint, datePartition: string
Project [id#160L, datePartition#161]
+- Filter (datePartition#161 < cast(date_format(cast(current_timestamp() - 3 days as timestamp), yyyy-MM-dd, Some(UTC)) as string))
   +- SubqueryAlias openhouse.oh_playground.retentionNewTest
      +- RelationV2[id#160L, datePartition#161] openhouse.oh_playground.retentionNewTest

== Optimized Logical Plan ==
Filter (isnotnull(datePartition#161) AND (datePartition#161 < 2024-04-21))
+- RelationV2[id#160L, datePartition#161] openhouse.oh_playground.retentionNewTest

== Physical Plan ==
*(1) Project [id#160L, datePartition#161]
+- *(1) Filter (isnotnull(datePartition#161) AND (datePartition#161 < 2024-04-21))
   +- BatchScan openhouse.oh_playground.retentionNewTest[id#160L, datePartition#161] openhouse.oh_playground.retentionNewTest [filters=datePartition IS NOT NULL, datePartition < '2024-04-21']

BatchScan shows that partitions with a specific value in filter

Changes

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

Mint Tests Fixed Unit tests to account for updated Query Ran tests with Local dataset to validate the Delete query deletes records as intended

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

For all the boxes checked, include additional details of the changes made in this pull request.

rohitkum2506 commented 2 months ago

Can you verify the scan work done by the query didn't result in full scan and update the testing done section?

Added query plan for new query. HDFS logs are not available yet for the test table I am working with. Will update testing section with HDFS actions associated with delete query when available.

jiang95-dev commented 2 months ago

General question: Do we align on letting users hold accountability of the columnPattern? Unwanted deletion can be easily avoided by looking at the partition table which is a metadata operation and doesn't hurt performance. Why don't we want to do the check?

rohitkum2506 commented 2 months ago

General question: Do we align on letting users hold accountability of the columnPattern? Unwanted deletion can be easily avoided by looking at the partition table which is a metadata operation and doesn't hurt performance. Why don't we want to do the check?

@jiang95-dev Good question. It's more of an implementation gotcha. In order to avoid a record deletion because of invalid pattern, we still would have to cross validate every columnValue with invalid partitions, implementation wise something like:

substring(columnVal, 0, len(columnPattern)) IN (<list of invalid partitions>) 
which again requires use of predicate function.

Getting all partitions is metadata ops but checking every data record against it will need data file reads. Happy to iterate on it if you have other ideas.