apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.13k stars 2.13k forks source link

Iceberg partition date/day equality problem when created on timestamp #6853

Closed Saranviveka closed 2 weeks ago

Saranviveka commented 1 year ago
spark.sql("""CREATE TABLE iceberg.glue_catalog.iceberg_table ( order_id bigint,
customer_id bigint, order_amount DECIMAL(10,2), category string, trans_ts Timestamp)
USING iceberg
location 's3://xxx/yyyy/iceberg_table'
PARTITIONED BY (bucket(5, order_id), date(trans_ts))
TBLPROPERTIES ('format-version' = '2') """)

spark.sql("""ALTER TABLE iceberg.glue_catalog.iceberg_table ADD PARTITION FIELD years(trans_ts)  """)
spark.sql("""ALTER TABLE iceberg.glue_catalog.iceberg_table ADD PARTITION FIELD months(trans_ts)  """)
spark.sql("""ALTER TABLE iceberg.glue_catalog.iceberg_table ADD PARTITION FIELD hours(trans_ts) """)
spark.sql("""ALTER TABLE iceberg.glue_catalog.iceberg_table ADD PARTITION FIELD category """)

spark.sql("""INSERT INTO iceberg.glue_catalog.iceberg_table
VALUES( 10001, 001, 06.17, 'soap', cast('2019-06-13 13:22:30' as timestamp) )
""")

Greater than works

trino> select * from iceberg.glue_catalog.iceberg_table where trans_ts > date'2019-06-12';
 order_id | customer_id | order_amount | category |            trans_ts
----------+-------------+--------------+----------+--------------------------------
    10001 |           1 |         6.17 | soap     | 2019-06-13 17:22:30.000000 UTC

Between works well
trino> select * from iceberg.glue_catalog.iceberg_table where trans_ts BETWEEN date'2019-06-12' and date'2019-06-14' ;
 order_id | customer_id | order_amount | category |            trans_ts
----------+-------------+--------------+----------+--------------------------------
    10001 |           1 |         6.17 | soap     | 2019-06-13 17:22:30.000000 UTC

Equality doesn't work

trino>select * from iceberg.glue_catalog.iceberg_table where trans_ts=date'2019-06-13';
 order_id | customer_id | order_amount | category | trans_ts
----------+-------------+--------------+----------+----------
(0 rows)
Fokko commented 1 year ago

Thanks for raising this @Saranviveka. Could you open up a PR in the Trino Github? Since I think it is more Trino related

Fokko commented 1 year ago

I took the time to look into this since it is quite a serious issue, and it seems that Spark has the same issue:

image

This is also the case when we don't run the additional ADD PARTITION FIELD statements. They don't make a lot of sense since it is already partitioned on the DATE(trans_ts) which includes the year and month already.

Saranviveka commented 1 year ago

In that case, is it iceberg that has to look in to the issue or the engines ?

Fokko commented 1 year ago

I think it is Iceberg. I'm wrapping up a project, and then I'm planning to look into this. I suspect that we skip the file somewhere when we evaluate against the statistics and then decide to skip the file.

Saranviveka commented 1 year ago

in addition, though we could create partitions on year, month how exactly do we query it from Trino or Athena. In Trino we have to cast it to either date or timestamp. No casting available for year or month or day in those cases what's the point of having those type of partition functions ?

rdblue commented 1 year ago

Is this a problem? I think it is correct, although not necessarily intuitive.

The filter is trans_ts=date'2019-06-13' but the only trans_ts in the table is 2019-06-13 17:22:30.000000 UTC, which is NOT equal to 2019-06-13 00:00:00.000000.

I can see how this would be unexpected, because you thought that the timestamp would be converted to a date but instead the date is converted to a timestamp, and then the equals check runs.

I think this is the same in both Trino and Spark because they both cast values to the wider type. In this case, timestamp rather than date.

rdblue commented 1 year ago

@Saranviveka, the partition functions specify the granularity of partitions, not actual values. Partitioning by years breaks data up into year-sized divisions, months into month-sized divisions, etc. You only need the most granular division, which in your case is hours.

Also, you don't interact with those values directly. When you filter on your timestamp column, the needed partitions are automatically selected by Iceberg.

Saranviveka commented 1 year ago

@rdblue The idea of creating multi level partitions is that when a user queries he or she doesnt need to know how it has been partitioned. Since a partition was created on date based on timestamp, the expectation is that it should match the exact date value without having any timestamps. Consider a business case that I buy a product at 11:05am today dated '2023-02-16' its timestamp would be say for example '2023-02-06 11:05:45'. Even though it is not '2023-02-06 00:00:00' when an analyst tries to query who are and all brought products on '2023-02-06' my record should be displayed if not then its a bug!

Idea of creating/specifying date based partition on timestamp is that when a corresponding date is provided without time, end user should get the result. If not then its a flaw.

I am not expecting that providing a wrong timestamp on the equality should match! But providing a correct date should match. If its not then how it casts and compares has to be rectified and not the business expectation

Saranviveka commented 1 year ago

When we created partitions on date, year and month. on spark we are able to operate on year value and month. But it has to be used in range.

scala> spark.sql("""select * from iceberg.catbdsql_glue_catalog.iceberg_table where trans_ts= '2019'  """).show(false)
+--------+-----------+------------+--------+--------+
|order_id|customer_id|order_amount|category|trans_ts|
+--------+-----------+------------+--------+--------+
+--------+-----------+------------+--------+--------+

scala> spark.sql("""select * from iceberg.catbdsql_glue_catalog.iceberg_table where trans_ts >= '2019' and trans_ts < '2020'  """).show(false)
+--------+-----------+------------+--------+-------------------+
|order_id|customer_id|order_amount|category|trans_ts           |
+--------+-----------+------------+--------+-------------------+
|10001   |1          |6.17        |soap    |2019-06-13 13:22:30|
+--------+-----------+------------+--------+-------------------+

scala> spark.sql("""select * from iceberg.catbdsql_glue_catalog.iceberg_table where trans_ts >= '2019-06' and trans_ts < '2019-07'  """).show(false)
+--------+-----------+------------+--------+-------------------+
|order_id|customer_id|order_amount|category|trans_ts           |
+--------+-----------+------------+--------+-------------------+
|10001   |1          |6.17        |soap    |2019-06-13 13:22:30|
+--------+-----------+------------+--------+-------------------+

Whereas on other query engines the same is not possible. As those values are validated against the data type of the field on which we are applying the filter criteria. In addition, some engines require explicit casting upfront for date and timestamp of the values. For those cases, we cant just provide yyyy or yyyy-mm as engine throws errors!

Fokko commented 1 year ago

@Saranviveka I agree that this might be confusing, but I see @rdblue's point. Querying the table can be done using equality using cast(trans_ts as date) = date '2019-01-13', then the widening will be avoided. Keep in mind that this is up to the query engine how to implement this, and Spark and Trino chose to handle it the same way.

Idea of creating/specifying date based partition on timestamp is that when a corresponding date is provided without time, end user should get the result.

Iceberg Partitions differ from your traditional Hive partitions. If you do an hour(trans_ts) partition then 2019-06-13 13:22:30 will be truncated into 2019-06-13 13:00:00, and therefore you already have partitioning on a day, month, and year level.

Saranviveka commented 1 year ago

@Fokko I agree with you that we can cast the source column to date while casting the filter value. But wouldnt be a costly operation to cast the source column and as well as we have to explicitly let the end user know about these which dissolves the advantage that end user doesnt need to know anything about partitioning.

My question is, Iceberg creates those partition folders with exact value of date when we specify a date partition. In that case how hard for the framework to handle it gracefully rather than expecting the enduser to cast it on the source column ?

Fokko commented 1 year ago

But wouldnt be a costly operation to cast the source column and as well as we have to explicitly let the end user know about these which dissolves the advantage that end user doesnt need to know anything about partitioning.

Iceberg should be able to handle this for you but is currently not the case.

image

We have two rows, in two distinct partitions:

image

When I fire up the tracing, we can see that it queries both of the files:

2023-02-16T19:50:59.520 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/metadata/00002-b153fc69-e69b-489d-aff3-49ffede57be9.metadata.json 172.18.0.3        1.45ms       ↑ 169 B ↓ 3.4 KiB
2023-02-16T19:50:59.552 [200 OK] s3.HeadObject minio:9000/warehouse/wh/default/iceberg_table/metadata/snap-8884861716966779118-1-a38366f2-1636-497f-bbaf-d7a81b27d026.avro 172.18.0.5        486µs       ↑ 133 B ↓ 0 B
2023-02-16T19:50:59.556 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/metadata/snap-8884861716966779118-1-a38366f2-1636-497f-bbaf-d7a81b27d026.avro 172.18.0.5        900µs       ↑ 148 B ↓ 4.2 KiB
2023-02-16T19:50:59.565 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/metadata/a38366f2-1636-497f-bbaf-d7a81b27d026-m0.avro 172.18.0.5        895µs       ↑ 148 B ↓ 7.0 KiB
2023-02-16T19:50:59.570 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/metadata/a8e7390e-d67e-42bb-accf-dfe2f8df9885-m0.avro 172.18.0.5        1.241ms      ↑ 148 B ↓ 7.0 KiB
2023-02-16T19:50:59.643 [200 OK] s3.HeadObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-13-13/00000-0-f46a696b-d858-49cd-bb18-c4d39b3578ab-00001.parquet 172.18.0.5        413µs       ↑ 133 B ↓ 0 B
2023-02-16T19:50:59.646 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-13-13/00000-0-f46a696b-d858-49cd-bb18-c4d39b3578ab-00001.parquet 172.18.0.5        662µs       ↑ 148 B ↓ 8 B
2023-02-16T19:50:59.649 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-13-13/00000-0-f46a696b-d858-49cd-bb18-c4d39b3578ab-00001.parquet 172.18.0.5        1.309ms      ↑ 148 B ↓ 1.1 KiB
2023-02-16T19:50:59.654 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-13-13/00000-0-f46a696b-d858-49cd-bb18-c4d39b3578ab-00001.parquet 172.18.0.5        1.074ms      ↑ 148 B ↓ 1.5 KiB
2023-02-16T19:50:59.684 [200 OK] s3.HeadObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-14-13/00000-1-07373866-4c83-4e5a-8577-e9aa24acbfc4-00001.parquet 172.18.0.5        552µs       ↑ 133 B ↓ 0 B
2023-02-16T19:50:59.687 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-14-13/00000-1-07373866-4c83-4e5a-8577-e9aa24acbfc4-00001.parquet 172.18.0.5        648µs       ↑ 148 B ↓ 8 B
2023-02-16T19:50:59.690 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-14-13/00000-1-07373866-4c83-4e5a-8577-e9aa24acbfc4-00001.parquet 172.18.0.5        750µs       ↑ 148 B ↓ 1.1 KiB
2023-02-16T19:50:59.694 [206 Partial Content] s3.GetObject minio:9000/warehouse/wh/default/iceberg_table/data/trans_ts_hour%3D2019-06-14-13/00000-1-07373866-4c83-4e5a-8577-e9aa24acbfc4-00001.parquet 172.18.0.5        756µs       ↑ 148 B ↓ 1.5 KiB

My question is, Iceberg creates those partition folders with exact value of date when we specify a date partition. In that case how hard for the framework to handle it gracefully rather than expecting the enduser to cast it on the source column?

Again, this is not up to Iceberg, but up to Spark/Trino/etc on how to do the comparison. See below where the behavior is the same against a plain Spark table. If you want to change this behavior, you should discuss this in the Trino/Spark community.

image
Saranviveka commented 1 year ago

@Fokko the idea of defining partitions is to reduce our I/O and now based on your inference if it ends up in scanning all the files , then we don't have to lean on creating partitions and directly scan the whole table.

This is how the file path looks like iceberg_table/data/order_id_bucket=3/trans_ts_day=2019-06-13/trans_ts_year=2019/trans_ts_month=2019-06/trans_ts_hour=2019-06-13-17/category=soap/00000-0-848b50e9-2f4d-42f0-baf6-fe543a2a4e88-00001.parquet

It is evident that the date value is exactly used on the folder hierarchy in that case why it has to be converted to timestamp even when I am casting the value field as date while doing a equality check. Is this specific to query engines or the iceberg ?

trino>select * from iceberg.glue_catalog.iceberg_table where trans_ts=date'2019-06-13'; order_id | customer_id | order_amount | category | trans_ts ----------+-------------+--------------+----------+---------- (0 rows)

Saranviveka commented 1 year ago

scala> spark.sql("""CREATE TABLE iceberg.glue_catalog.iceberg_table1 ( order_id bigint, | customer_id bigint, order_amount DECIMAL(10,2), category string, trans_dt date) | USING iceberg | location 's3://xxx/glue_catalog/iceberg_table1' | PARTITIONED BY (bucket(5, order_id), trans_dt, years(trans_dt)) | TBLPROPERTIES ('format-version' = '2') """)

scala> spark.sql("""ALTER TABLE iceberg.glue_catalog.iceberg_table1 ADD PARTITION FIELD months(trans_dt) """) res29: org.apache.spark.sql.DataFrame = []

scala> spark.sql("""ALTER TABLE iceberg.glue_catalog.iceberg_table1 ADD PARTITION FIELD category """) res30: org.apache.spark.sql.DataFrame = []

scala> spark.sql(""" | INSERT INTO iceberg.glue_catalog.iceberg_table1 | VALUES | ( 10001, 001, 06.17, 'soap', cast('2019-06-13' as date) ) | """) res31: org.apache.spark.sql.DataFrame = []

If we check the above example, i have defined the trans_dt as a date field and enabled partition on trans_dt, years(trans_dt), months(trans_dt)

This is how the file path looks like glue_catalog/iceberg_table1/data/order_id_bucket=3/trans_dt=2019-06-13/trans_dt_year=2019/trans_dt_month=2019-06/category=soap/00000-6-b4c62154-526d-4782-b9d3-9bd093b550ff-00001.parquet

scala> spark.sql("""select * from iceberg.glue_catalog.iceberg_table1 where trans_dt= '2019' """).show(false) +--------+-----------+------------+--------+--------+ |order_id|customer_id|order_amount|category|trans_dt| +--------+-----------+------------+--------+--------+ +--------+-----------+------------+--------+--------+

scala> spark.sql("""select * from iceberg.glue_catalog.iceberg_table1 where trans_dt >= '2019' and trans_dt < '2020' """).show(false) +--------+-----------+------------+--------+----------+ |order_id|customer_id|order_amount|category|trans_dt | +--------+-----------+------------+--------+----------+ |10001 |1 |6.17 |soap |2019-06-13| +--------+-----------+------------+--------+----------+

My question is; if we couldn't make use of year or month values directly while querying, then whats the point of creating partitions on those segments? Honestly would like to know, when and how it would be leveraged/actually used.

Saranviveka commented 1 year ago

Team, Would like to know whether equality check would be considered for date partition created on timestamp field ?

youngxinler commented 1 year ago

My question is; if we couldn't make use of year or month values directly while querying, then whats the point of creating partitions on those segments? Honestly would like to know, when and how it would be leveraged/actually used.

IMO: partitions on those segments is useful, it also works on partition query to filter files, I also think that when using date equality comparison, the format should be converted first, otherwise the equal condition will cause ambiguity, it's better that should use from_unixtime(ts / 1000, 'YYYY') = '2021' or cast(ts as date) = '20210101' to get equal date data.

JakeMcPherson25 commented 1 year ago

I am seeing a similar issue with decimal partitions with trailing zero's using an IN predicate. They seem to get filtered out through the ResidualEvaluator. For example the table data structure:

s3a://warehouse/iceberg_test1/data/varchar1=test1/int1=1383779907/decimal1=1.1000
s3a://warehouse/iceberg_test1/data/varchar1=test1/int1=1383779907/decimal1=1.2000
s3a://warehouse/iceberg_test1/data/varchar1=test1/int1=1383779907/decimal1=1.2001
s3a://warehouse/iceberg_test1/data/varchar1=test1/int1=1383779907/decimal1=1.2002
s3a://warehouse/iceberg_test1/data/varchar1=test1/int1=1383779907/decimal1=1.4000

when running the predicate:SELECT * FROM iceberg_test1 where decimal1 IN(1.4000,1.1000) it returns the results

real1                    varchar1                                      int1                    decimal1
------------------------ --------------------------------------------- ----------------------- ---------------------------

  0 record(s) selected.

furthermore this works when the IN predicate only contains 1 value (in this situation iceberg converts the the IN to an == here: https://github.com/apache/iceberg/blob/c256604b34233885f6565902719bbfcf55e4cacf/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java#L223-L234 and this predicate produces the correct results.

Eliminating the decimal column from the partition expression during the create table statement resolves this issue as well. I understand that this is most likely a separate issue but both problems seem extremely similar

github-actions[bot] commented 8 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 2 weeks ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'