prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
15.92k stars 5.33k forks source link

Presto can’t parse s3 path yyyy-mm-dd into timestamp type when running analyze statement #19069

Open yingsu00 opened 1 year ago

yingsu00 commented 1 year ago

Presto can’t parse s3 path yyyy-mm-dd into timestamp type when running analyze statement, even though select statement runs fine. Here's how to reproduce the issue:

CREATE TABLE glue.ng_public.kewei_timestamp_partitioned (
"orderkey" bigint,
"shipdate" timestamp
)
WITH (
format = 'PARQUET',
partitioned_by = ARRAY['shipdate']
);
insert into glue.ng_public.kewei_timestamp_partitioned values (1, from_unixtime(1));
insert into glue.ng_public.kewei_timestamp_partitioned values (1, from_unixtime(2));
-- it worked
analyze glue.ng_public.kewei_timestamp_partitioned;
-- on s3, I see 2 directories: `shipdate=1970-01-01 1970 01 01 00 00 01/` and `shipdate=1970-01-01 1970 01 01 00 00 02/`
-- I then manually created s3 directory `shipdate=1970-01-01/` and moved parquet files from the 2 above directory into the new directory. Then I deleted the above 2 directory.
CALL glue.system.sync_partition_metadata('ng_public', 'kewei_timestamp_partitioned', 'DROP');
CALL glue.system.sync_partition_metadata('ng_public', 'kewei_timestamp_partitioned', 'ADD');
-- shows 2 rows
select * from glue.ng_public.kewei_timestamp_partitioned;

-- **failed with "All computed statistics must be used"**
analyze glue.ng_public.kewei_timestamp_partitioned;

As another example, when the partition column (there is only one single partition column in the table) type is timestamp and the s3 data directory is named as shipdate=1970-01-01 00%3A00%3A01.0/, the analyze statement works file. but we have lots of tables whose partition column type is timestamp and s3 data directory is named as shipdate=1970-01-01 00%3A00%3A01/ (without the .0 suffix) and the analyze statement fails with the

com.google.common.base.VerifyException: All computed statistics must be used error. 

Here’s how to reproduce it:

CREATE TABLE glue.ng_public.kewei_timestamp_partitioned2 (
"orderkey" bigint,
"shipdate" timestamp
)
WITH (
format = 'PARQUET',
partitioned_by = ARRAY['shipdate']
);
insert into glue.ng_public.kewei_timestamp_partitioned2 values (1, from_unixtime(1));
-- it worked
analyze glue.ng_public.kewei_timestamp_partitioned2;

-- on s3, I see one directory: shipdate=1970-01-01 00%3A00%3A01.0/ -- I then manually created s3 directory shipdate=1970-01-01 00%3A00%3A01/ and moved parquet file from shipdate=1970-01-01 00%3A00%3A01.0/ into the new directory. Then I deleted the above old directory shipdate=1970-01-01 00%3A00%3A01.0/.

CALL glue.system.sync_partition_metadata('ng_public', 'kewei_timestamp_partitioned2', 'DROP');
CALL glue.system.sync_partition_metadata('ng_public', 'kewei_timestamp_partitioned2', 'ADD');
-- shows 1 row
select * from glue.ng_public.kewei_timestamp_partitioned2;

-- **failed with "All computed statistics must be used"**
analyze glue.ng_public.kewei_timestamp_partitioned2;
majetideepak commented 1 year ago

@yingsu00 The fix for this is to normalize (1970-01-01 -> 1970-01-01 00:00:00.0) other timestamp format partition values before we search the computedStatisticsMap https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1536

However, I am concerned about supporting other timestamp format partition values for analyzing partition statistics as requested in this issue. This can lead to incorrect results. For example, all the valid three timestamp values 1970-01-01, 1970-01-01 00:00:00, 1970-01-01 00:00:00.0 map to the same timestamp value 1970-01-01 00:00:00.0. In practice, this will allow three different partitions to exist since the partitions are based on string representation. However, analyze statistics will combine all three partition statistics into 1970-01-01 -> 1970-01-01 00:00:00.0 value. If we support the above request, we will end up assigning the combined stats to each partition which is incorrect.

yingsu00 commented 1 year ago

@majetideepak Thanks for bringing up the concern. This is essentially a discrepancy between how Presto thinks a "partition" is and the concept of a HDFS directory. In my understanding, the original "Hive partition" refers to a HDFS directory, so that a folder of shipdate=1970-01-01 and a folder of shipdate=1970-01-01 00:00:00 for column shipdate in timestamp type are two different partitions. But Presto converts the partition values to its proper type instead of a string. In other words, it partitions by value in its original type, not by value in string .

When we create a partition, and insert rows into the partitions, the data would be inserted to the target partition by applying a hash function on the typed value. If you try to insert a different type, the query would fail:

presto:temp>  CREATE TABLE ts_partitioned
          ->  (
          ->     "orderkey" bigint,
          ->     "shipdate" timestamp
          ->  )
          ->  WITH (
          ->     format = 'PARQUET',
          ->     partitioned_by = ARRAY['shipdate']
          ->  );
CREATE TABLE
presto:temp> insert into ts_partitioned values(2, 0);
Query 20230223_015849_00004_5u5f7 failed: line 1:45: Mismatch at column 2: 'shipdate' is of type timestamp but expression is of type integer

Now let's see the following operations:


presto:temp> explain analyze insert into ts_partitioned values (1, timestamp '1969-12-31 00:00:00');
                                                                                                           Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [COORDINATOR_ONLY]
     CPU: 3.51ms, Scheduled: 5.53ms, Input: 7 rows (1.62kB); per task: avg.: 7.00 std.dev.: 0.00, Output: 1 row (9B)
     Output layout: [rows_4]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - TableCommit[Optional[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=temp, tableName=ts_partitioned, analyzePartitionValues=Optional.empty}',
             CPU: 3.00ms (23.08%), Scheduled: 5.00ms (7.35%), Output: 1 row (9B)
             Input avg.: 0.00 rows, Input std.dev.: ?%
             Collisions avg.: ? (?% est.), Collisions std.dev.: ?%
         - RemoteSource[2] => [rows:bigint, fragments:varbinary, commitcontext:varbinary, field_0:timestamp, min:bigint, max:bigint, approx_distinct:varbinary, count:bigint]
                 CPU: 1.00ms (7.69%), Scheduled: 1.00ms (1.47%), Output: 7 rows (1.62kB)
                 Input avg.: 7.00 rows, Input std.dev.: 0.00%

 Fragment 2 [ROUND_ROBIN]
     CPU: 17.26ms, Scheduled: 67.10ms, Input: 1 row (18B); per task: avg.: 0.25 std.dev.: 0.43, Output: 7 rows (1.62kB)
     Output layout: [rows, fragments, commitcontext, field_0, min, max, approx_distinct, count]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - TableWriterMerge => [rows:bigint, fragments:varbinary, commitcontext:varbinary, field_0:timestamp, min:bigint, max:bigint, approx_distinct:varbinary, count:bigint]
             CPU: 1.00ms (7.69%), Scheduled: 12.00ms (17.65%), Output: 7 rows (1.62kB)
             Input avg.: 0.00 rows, Input std.dev.: ?%
             Collisions avg.: ? (?% est.), Collisions std.dev.: ?%
         - LocalExchange[SINGLE] () => [partialrowcount:bigint, partialfragments:varbinary, partialcontext:varbinary, field_0:timestamp, min_5:bigint, max_6:bigint, approx_distin
                 CPU: 0.00ns (0.00%), Scheduled: 2.00ms (2.94%), Output: 10 rows (2.16kB)
                 Input avg.: 1.25 rows, Input std.dev.: 52.92%
             - TableWriter => [partialrowcount:bigint, partialfragments:varbinary, partialcontext:varbinary, field_0:timestamp, min_5:bigint, max_6:bigint, approx_distinct_7:varb
                     CPU: 7.00ms (53.85%), Scheduled: 29.00ms (42.65%), Output: 10 rows (2.16kB)
                     Input avg.: 0.00 rows, Input std.dev.: ?%
                     Collisions avg.: ? (?% est.), Collisions std.dev.: ?%
                     orderkey := orderkey (1:44)
                     shipdate := field_0 (1:44)
                     Statistics collected: 4
                 - LocalExchange[ROUND_ROBIN] () => [orderkey:bigint, field_0:timestamp]
                         CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1 row (18B)
                         Input avg.: 0.06 rows, Input std.dev.: 387.30%
                     - RemoteSource[3] => [orderkey:bigint, field_0:timestamp]
                             CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1 row (18B)
                             Input avg.: 0.06 rows, Input std.dev.: 387.30%

 Fragment 3 [SINGLE]
     CPU: 2.07ms, Scheduled: 21.30ms, Input: 1 row (14B); per task: avg.: 1.00 std.dev.: 0.00, Output: 1 row (18B)
     Output layout: [orderkey, field_0]
     Output partitioning: ROUND_ROBIN []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Project[projectLocality = LOCAL] => [orderkey:bigint, field_0:timestamp]
             CPU: 1.00ms (7.69%), Scheduled: 19.00ms (27.94%), Output: 1 row (18B)
             Input avg.: 0.25 rows, Input std.dev.: 173.21%
             orderkey := CAST(field AS bigint) (1:45)
         - LocalExchange[ROUND_ROBIN] () => [field:integer, field_0:timestamp]
                 CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1 row (14B)
                 Input avg.: 1.00 rows, Input std.dev.: 0.00%
             - Values => [field:integer, field_0:timestamp]
                     CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1 row (14B)
                     Input avg.: 1.00 rows, Input std.dev.: 0.00%
                     (INTEGER'1', TIMESTAMP'1969-12-31 00:00:00.000')

(1 row)

presto:temp> select * from "ts_partitioned$partitions";
        shipdate
-------------------------
 1969-12-31 00:00:00.000
(1 row)

You can see that TableWriter writes the data based on the value of field_0:timestamp. Internally it applies the hash function on it and decides which partition(directory) it goes to. If you insert another row with value '1969-12-31', the new row is going to the same partition(directory).

presto:temp> insert into ts_partitioned values (2, timestamp '1969-12-31');

presto:temp> select * from "ts_partitioned$partitions";
        shipdate
-------------------------
 1969-12-31 00:00:00.000

Two files were created in the same folder:

Yings-MacBook-Pro:ts_partitioned yingsu$ cd shipdate=1969-12-31 00%3A00%3A00.0
Yings-MacBook-Pro:shipdate=1969-12-31 yingsu$ ls -lhta
total 48
-rw-r--r--  1 yingsu  staff    12B Feb 22 18:39 .20230223_023148_00030_5u5f7_0590627a-3b10-4009-a708-24441a5068a6.crc
-rw-r--r--  1 yingsu  staff    12B Feb 22 18:39 .20230223_021514_00024_5u5f7_b0492c47-4c18-46d1-bcec-e54d15c9a252.crc
-rw-r--r--  1 yingsu  staff   900B Feb 22 18:38 .prestoSchema
drwxr-xr-x  8 yingsu  staff   256B Feb 22 18:36 .
-rw-r--r--  1 yingsu  staff    16B Feb 22 18:36 ..prestoSchema.crc
-rw-r--r--  1 yingsu  staff   334B Feb 22 18:36 20230223_023148_00030_5u5f7_0590627a-3b10-4009-a708-24441a5068a6
-rw-r--r--  1 yingsu  staff   334B Feb 22 18:36 20230223_021514_00024_5u5f7_b0492c47-4c18-46d1-bcec-e54d15c9a252
drwxrwxrwx  7 yingsu  staff   224B Feb 22 18:35 ..

Similarly, analyze ts_partitioned also treats partitions as values:

presto:temp> explain analyze ts_partitioned;
                                                                                                                                                                    Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 - Output[rows] => [rows:bigint]
     - StatisticsWriter[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=temp, tableName=ts_partitioned, analyzePartitionValues=Optional.empty}', layo
         - LocalExchange[SINGLE] () => [shipdate:timestamp, rowcount:bigint, min_value_orderkey:bigint, max_value_orderkey:bigint, number_of_distinct_values_orderkey:bigint, numb
             - RemoteStreamingExchange[GATHER] => [shipdate:timestamp, rowcount:bigint, min_value_orderkey:bigint, max_value_orderkey:bigint, number_of_distinct_values_orderkey:b
                 - Project[projectLocality = LOCAL] => [shipdate:timestamp, number_of_non_null_values_orderkey:bigint, min_value_orderkey:bigint, max_value_orderkey:bigint, rowco
                     - Aggregate(FINAL)[shipdate][$hashvalue] => [shipdate:timestamp, $hashvalue:bigint, number_of_non_null_values_orderkey:bigint, min_value_orderkey:bigint, max
                             number_of_non_null_values_orderkey := "presto.default.count"((count_0)) (1:10)
                             min_value_orderkey := "presto.default.min"((min)) (1:10)
                             max_value_orderkey := "presto.default.max"((max)) (1:10)
                             rowcount := "presto.default.count"((count))
                             number_of_distinct_values_orderkey := "presto.default.approx_distinct"((approx_distinct)) (1:10)
                         - LocalExchange[HASH][$hashvalue] (shipdate) => [shipdate:timestamp, count:bigint, approx_distinct:varbinary, count_0:bigint, min:bigint, max:bigint, $ha
                             - RemoteStreamingExchange[REPARTITION][$hashvalue_1] => [shipdate:timestamp, count:bigint, approx_distinct:varbinary, count_0:bigint, min:bigint, max
                                 - Aggregate(PARTIAL)[shipdate][$hashvalue_2] => [shipdate:timestamp, $hashvalue_2:bigint, count:bigint, approx_distinct:varbinary, count_0:bigint
                                         count := "presto.default.count"(*)
                                         approx_distinct := "presto.default.approx_distinct"((orderkey)) (1:10)
                                         count_0 := "presto.default.count"((orderkey)) (1:10)
                                         min := "presto.default.min"((orderkey)) (1:10)
                                         max := "presto.default.max"((orderkey)) (1:10)
                                     - ScanProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=temp, tableName=ts_partitioned, analyzePar
                                             Estimates: {rows: 4 (108B), cpu: 72.00, memory: 0.00, network: 0.00}/{rows: 4 (108B), cpu: 180.00, memory: 0.00, network: 0.00}
                                             $hashvalue_2 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(shipdate), BIGINT'0')) (1:9)
                                             LAYOUT: temp.ts_partitioned{}
                                             orderkey := orderkey:bigint:0:REGULAR (1:9)
                                             shipdate := shipdate:timestamp:-13:PARTITION_KEY (1:9)
                                                 :: [["1969-12-31 00:00:00.000"]]

The partition value is input to StatisticsWriter as a column(Presto Block), and all data with the same timestamp would be aggregated together, i.e. Presto thinks they are from the same partition. If you manually add another folder of shipdate=1969-12-31 and add 2 rows in it, then the data in this folder, together with the 2 rows in folder shipdate=1969-12-31 00%3A00%3A00.0, would be aggregated together into ONE computed statistics object, with ROW_COUNT=4.

This behavior is a more fundamental issue (if it's considered as an issue). If you think the two folders should have TWO statistics objects, Presto needs a fundamental change. In this sense, recognizing both folders as one partition does make sense and does not hurt for stats collection. Suppose the user wants to get all data where shipdate=timestamp '1969-12-31', he/she should get all 4 rows, not just the 2 rows in folder shipdate=1969-12-31.

Now let's back off one step, the user does NOT have another folder with the same timestamp value. The user lands data using Spark which writes the HDFS directory in a different format other than Presto. If Presto cannot recognize it being a valid partition while doing analyze table, it is indeed a bug.

That being said, normalizing the partition values in "analyze table" seems a more appropriate fix because 1) Presto only generates one statistics object anyways 2) This is a relatively smaller change than changing how Presto thinks and works on partition values.

cc @tdcmeehan @rschlussel @imjalpreet What do you think? Any other ideas?

imjalpreet commented 1 year ago

@yingsu00 I think normalising the partition values sounds good. IMO if the partition column is defined as a timestamp, then it would make sense if all of 1970-01-01, 1970-01-01 00:00:00, 1970-01-01 00:00:00.0 correspond to the same value. Since for a timestamp value, all the three have the same logical significance and while querying in the current implementation it would anyway return all the three values. So, imo if we go ahead and normalise the different string representations it should not be a problem.

majetideepak commented 1 year ago

I did some more investigation supporting the multiple valid timestamp values and things seem to work as expected on the query side. Insert/delete partitions will need more work to handle non-standard timestamp partition values say coming from Spark. eg: 1970-01-01 00:00:00

keweishang commented 1 year ago

From Presto user's point of view, in our case, we never have multiple folders for the same partition. Even if we had multiple folders for the same partition, it is an acceptable behavior that select and analyze aggregate the results from multiple folders.

We also don't use Presto for insert and delete operations, as we use Spark to write data, and Presto is mainly for querying.

Not sure if what I'm suggesting makes sense, if it is really difficult to support this in analyze by default, at least making it configurable would make users' life much easier and make Presto more compatible with other tools such as Spark.

yingsu00 commented 1 year ago

@majetideepak Thanks for looking at this problem again. Could you please elaborate a bit more on what you plan to do on insert/delete? I did notice there is some discrepancy in the partition operations. For example, if there are two folders with the same partition value, SELECT table$partition would return 2 partitions. However that problem, together with insert/delete, don't need to block this fix for analyze table. We can fix this particular problem by normalizing the partition values, then the customer can be unblocked. We can definitely start a broader discussion on how to unify Presto's behavior on partitions later. What do you think?

majetideepak commented 1 year ago

@yingsu00 I agree that we should fix the normalizing of the partition values first. I started working on this. I noticed that the delete partition will delete the files but not the partition directories of other timestamp formats. We can fix this later. Users might also want to write to a Spark format timestamp partition which can be supported later.

yingsu00 commented 1 year ago

@majetideepak That's great, thank you! How about we discuss the partition deletion/counting issues in the TSC meeting next week to get people's consensus on what the correct behavior should be?

majetideepak commented 1 year ago

@yingsu00 definitely worth discussing in the TSC meeting tomorrow. Will add it to the agenda.