apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.88k stars 2.07k forks source link

`rewrite_data_files` does not respect table sort order #10346

Open bk-mz opened 1 month ago

bk-mz commented 1 month ago

Apache Iceberg version

1.5.2 (latest release)

Query engine

Spark

Please describe the bug 🐞

image


// let's take this partitions that has 77 files
val partitionId = 476167 // 2024-04-27 07:00:00
val sortFieldId = 44 // key='sid'
val table_name = '' // 

def countOverlappingRanges(): Long = {
  val df = spark.sql(s"""
    select
      string(element_at(lower_bounds, $sortFieldId)) as lower,
      string(element_at(upper_bounds, $sortFieldId)) as upper
    from $table_name.files
    where not contains(file_path, 'deletes')
      and partition.data_load_ts_hour = $partitionId
  """)

  val overlappingCount = df.as("df1")
    .crossJoin(df.as("df2"))
    .filter($"df1.upper" >= $"df2.lower" && $"df1.lower" <= $"df2.upper" && $"df1.lower" < $"df2.lower")
    .count()

  overlappingCount
}

// let's rewrite with explicit sort order
spark.sql(s"""CALL system.rewrite_data_files(table => $table_name, where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'), sort_order => 'sid ASC')""").show(false);

countOverlappingRanges

// let's rewrite w/o specifying sort order
spark.sql(s"""CALL system.rewrite_data_files(table => $table_name, where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'))""").show(false);

countOverlappingRanges

Output

scala> spark.sql("""CALL system.rewrite_data_files(table => '...', where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'), sort_order => 'sid ASC')""").show(false);
+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|80                        |73                    |9848363198           |0                      |
+--------------------------+----------------------+---------------------+-----------------------+

scala>

scala> countOverlappingRanges
res37: Long = 0

scala>

scala> // let's rewrite w/o specifying sort order

scala> spark.sql("""CALL system.rewrite_data_files(table => '...', where => 'data_load_ts = "2024-04-27 07:00:00"', options => map('rewrite-all','true'))""").show(false);
+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|73                        |81                    |9869637801           |0                      |
+--------------------------+----------------------+---------------------+-----------------------+

scala>

scala> countOverlappingRanges
res39: Long = 1385
RussellSpitzer commented 1 month ago

What plan was used when the sort order wasn't specified, we should be able to see without any data file checking what sort order was used?

This should be easily visible in the Spark UI

bk-mz commented 1 month ago

Screenshot 2024-05-17 at 18 11 57

with explicit sort

== Parsed Logical Plan ==
AppendData RelationV2[ ... 22 more fields] default_cache_iceberg.`id1` glue.table, [rewritten-file-scan-task-set-id=id1, target-file-size-bytes=187904819, use-table-distribution-and-ordering=false, path=id1], true
+- Sort [sid#155 ASC NULLS FIRST], false
   +- RepartitionByExpression [sid#155 ASC NULLS FIRST], 73
      +- RelationV2[ ... 22 more fields] default_cache_iceberg.`id1` glue.table

== Analyzed Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id1` glue.table, [rewritten-file-scan-task-set-id=id1, target-file-size-bytes=187904819, use-table-distribution-and-ordering=false, path=id1], true
+- Sort [sid#155 ASC NULLS FIRST], false
   +- RepartitionByExpression [sid#155 ASC NULLS FIRST], 73
      +- RelationV2[... 22 more fields] default_cache_iceberg.`id1` glue.table

== Optimized Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id1` glue.table, [rewritten-file-scan-task-set-id=id1, target-file-size-bytes=187904819, use-table-distribution-and-ordering=false, path=id1], true, IcebergWrite(table=glue.table, format=PARQUET), Sort [sid#155 ASC NULLS FIRST], false
+- Sort [sid#155 ASC NULLS FIRST], false
   +- RepartitionByExpression [sid#155 ASC NULLS FIRST], 73
      +- RelationV2[... 22 more fields] glue.table

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$4781/1107532386@2e3397b1, IcebergWrite(table=glue.table, format=PARQUET)
+- Sort [sid#155 ASC NULLS FIRST], false, 0
   +- Exchange rangepartitioning(sid#155 ASC NULLS FIRST, 73), REPARTITION_BY_NUM, [plan_id=17]
      +- Project [... 22 more fields]
         +- BatchScan glue.table[... 22 more fields] glue.table (branch=null) [filters=, groupedBy=] RuntimeFilters: []

w/o explicit sort

== Parsed Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table, [rewritten-file-scan-task-set-id=id2, target-file-size-bytes=187904819, distribution-mode=none, path=id2], true
+- RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table

== Analyzed Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table, [rewritten-file-scan-task-set-id=id2, target-file-size-bytes=187904819, distribution-mode=none, path=id2], true
+- RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table

== Optimized Logical Plan ==
AppendData RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table, [rewritten-file-scan-task-set-id=id2, target-file-size-bytes=187904819, distribution-mode=none, path=id2], true, IcebergWrite(table=glue.table, format=PARQUET), RelationV2[... 22 more fields] default_cache_iceberg.`id2` glue.table
+- Sort [staticinvoke(class org.apache.iceberg.spark.functions.HoursFunction$TimestampToHoursFunction, IntegerType, invoke, data_load_ts#641, TimestampType, false, true, true) ASC NULLS FIRST, sid#639 ASC NULLS FIRST], false
   +- RelationV2[... 22 more fields] glue.table

== Physical Plan ==
AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$4781/1107532386@204b1e50, IcebergWrite(table=glue.table, format=PARQUET)
+- Sort [staticinvoke(class org.apache.iceberg.spark.functions.HoursFunction$TimestampToHoursFunction, IntegerType, invoke, data_load_ts#641, TimestampType, false, true, true) ASC NULLS FIRST, sid#639 ASC NULLS FIRST], false, 0
   +- Project [... 22 more fields]
      +- BatchScan glue.table[... 22 more fields] glue.table (branch=null) [filters=, groupedBy=] RuntimeFilters: []
bk-mz commented 1 month ago

Okay, I admit that calling this non-working was a bit premature.

Still, the thing is that partition has a lot of overlapping files after a partition without explicitly setting the sort-order.

data_load_ts in any case is a hidden partition timestamp which is a constant for all of the records in it, truncated to hour.

RussellSpitzer commented 1 month ago

So definitely using the default sort order as evidenced by the plan but something in our sort request to spark isn't working properly. While the two plans are different I feel like they should both have correct output. Probably will need to debug a bit more

RussellSpitzer commented 1 month ago

A little odd that the first plan doesn't have the partitioning transform which it probably should have ...