linkedin / openhouse

Open Control Plane for Tables in Data Lakehouse
https://www.openhousedb.org/
BSD 2-Clause "Simplified" License
273 stars 43 forks source link

Remove date_trunc transform on a partition timestamp column in retention stmt #110

Closed teamurko closed 1 month ago

teamurko commented 1 month ago

Summary

This is an optimization change to enable predicate-pushdown in delete statement in retention job. Spark plan without the change:

scala> spark.sql("explain select count(*) from T where date_trunc('HOUR', datepartition) < date_trunc('HOUR', current_timestamp() - INTERVAL 72 HOURs)").show(2000, false)
...                                                                                                                                                                                                                                                                                                                                                                                                                                                     
|== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#46]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- Project
            +- Filter (date_trunc(HOUR, datepartition#188, Some(UTC)) < 1716652800000000)
               +- BatchScan openhouse.T[datepartition#188] openhouse.T [filters=]

Spark plan after the change:

scala> spark.sql("explain select count(*) from T where datepartition < date_trunc('HOUR', current_timestamp() - INTERVAL 72 HOURs)").show(2000, false)
...
|== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#71]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- Project
            +- Filter (isnotnull(datepartition#220) AND (datepartition#220 < 1716656400000000))
               +- BatchScan openhouse.T[datepartition#220] openhouse.T[filters=datepartition IS NOT NULL, datepartition < 1716656400000000]
scala> spark.sql("explain delete from T where datepartition < date_trunc('HOUR', current_timestamp() - INTERVAL 72 HOURs)").show(2000, false)
...
|== Physical Plan ==
ReplaceData IcebergBatchWrite(table=openhouse.T, format=ORC), org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy$$Lambda$4425/341350645@38e84b78
+- AdaptiveSparkPlan isFinalPlan=false
   +- Project [...]
      +- Sort [_file#279 ASC NULLS FIRST, _pos#280L ASC NULLS FIRST], false, 0
         +- Filter NOT ((isnotnull(datepartition#252) AND (datepartition#252 < 1716656400000000)) <=> true)
            +- ...] openhouse.T [filters=datepartition IS NOT NULL, datepartition < 1716656400000000]
               +- HashAggregate(keys=[_file#279], functions=[])
                  +- Exchange hashpartitioning(_file#279, 200), ENSURE_REQUIREMENTS, [id=#122]
                     +- HashAggregate(keys=[_file#279], functions=[])
                        +- Project [_file#279]
                           +- Filter (isnotnull(datepartition#252) AND (datepartition#252 < 1716656400000000))
                              +- ExtendedBatchScan[...] openhouse.T [filters=datepartition IS NOT NULL, datepartition < 1716656400000000]

Changes

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

Ran explain on the stmt and existing tests pass.

Additional Information

For all the boxes checked, include additional details of the changes made in this pull request.