trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.42k stars 3k forks source link

Trino Iceberg not honoring existing timestamp column type name of the table created outside Trino (e.g. Spark) stored in HMS #11442

Open puchengy opened 2 years ago

puchengy commented 2 years ago

In Spark, Iceberg type timestamp with timezone is mapped to Spark timestamp (doc). So when creating an Iceberg table in Spark, HMS will be storing the table column type name as timestamp.

In Trino, Iceberg type timestamp with timezone (TIMESTAMPTZ) is mapped to Trino type TIMESTAMP(6) WITH TIME ZONE (doc). When writing from Trino into an Iceberg table created by Spark previously, Trino tends perform alter table operation with the thrift table object using the column type name timestamp with local time zone. This conflicts with the type name timestamp stored in HMS (this is the line where it breaks)

I think Trino should honor previous column type name and avoid changing it.

Way to reproduce:

  1. create an Iceberg table from SparkSQL with one column of type timestamp
  2. insert data into that Iceberg table from Trino
zhaoyim commented 2 years ago

I think this maybe a spark sql issue, as the doc said: in Spark, Iceberg type timestamp with timezone is mapped to Spark timestamp (doc). so I think when create the table in spark sql, the metadata in the hive should save as 'timestamp with local time zone', read some code and found the trino deal with the timestamp always read from iceberg metadata then sync to HMS, I think this is the right way, attached some test results FYI.

No matter desc the table from which components trino, hive or spark, it just show its own correct timestamp, that is be OK: timestampz show diff components: trino: timestamp(6) with time zone hive: timestamp with local time zone spark: timestamp

trino create table then insert data from trino:

before insert: trino:

trino> use iceberg.icebergtest;
USE
trino:icebergtest> create table testtrino1(t timestamp(6) with time zone);
CREATE TABLE
trino:icebergtest> desc testtrino1;
 Column |            Type             | Extra | Comment
--------+-----------------------------+-------+---------
 t      | timestamp(6) with time zone |       |
(1 row)

Query 20220414_092325_00004_ub3iy, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.80 [1 rows, 87B] [1 rows/s, 109B/s]

hive:

0: jdbc:hive2://host-10-1-242-30:2181,host-10> desc testtrino1;
INFO  : Compiling command(queryId=hive_20220414172439_21853d93-d352-4f0c-afd6-5ff4ef4c8f15): desc testtrino1
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220414172439_21853d93-d352-4f0c-afd6-5ff4ef4c8f15); Time taken: 0.059 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20220414172439_21853d93-d352-4f0c-afd6-5ff4ef4c8f15): desc testtrino1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220414172439_21853d93-d352-4f0c-afd6-5ff4ef4c8f15); Time taken: 0.024 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+-----------+---------------------------------+----------+
| col_name  |            data_type            | comment  |
+-----------+---------------------------------+----------+
| t         | timestamp with local time zone  |          |
+-----------+---------------------------------+----------+
1 row selected (0.115 seconds)

spark sql:

spark-sql> desc testtrino1;
t   timestamp

# Partitioning
Not partitioned
Time taken: 1.002 seconds, Fetched 4 row(s)

after insert: trino:

trino:icebergtest> insert into testtrino1 values (timestamp '2020-01-01 01:10:10');
INSERT: 1 row

Query 20220414_092950_00008_ub3iy, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
2.11 [0 rows, 0B] [0 rows/s, 0B/s]

trino:icebergtest> desc testtrino1;
 Column |            Type             | Extra | Comment
--------+-----------------------------+-------+---------
 t      | timestamp(6) with time zone |       |
(1 row)

Query 20220414_092958_00009_ub3iy, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.62 [1 rows, 88B] [1 rows/s, 143B/s]

hive:

0: jdbc:hive2://host-10-1-242-30:2181,host-10> desc testtrino1;
INFO  : Compiling command(queryId=hive_20220414173044_9efe6f66-87fd-4bf5-b51c-5a6c80c29136): desc testtrino1
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220414173044_9efe6f66-87fd-4bf5-b51c-5a6c80c29136); Time taken: 0.047 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20220414173044_9efe6f66-87fd-4bf5-b51c-5a6c80c29136): desc testtrino1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220414173044_9efe6f66-87fd-4bf5-b51c-5a6c80c29136); Time taken: 0.022 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+-----------+---------------------------------+----------+
| col_name  |            data_type            | comment  |
+-----------+---------------------------------+----------+
| t         | timestamp with local time zone  |          |
+-----------+---------------------------------+----------+
1 row selected (0.095 seconds)

spark sql:

spark-sql> desc testtrino1;
t   timestamp

# Partitioning
Not partitioned
Time taken: 0.115 seconds, Fetched 4 row(s)

hive create table then insert data from trino:

before insert:

hive:

0: jdbc:hive2://host-10-1-242-30:2181,host-10> CREATE EXTERNAL TABLE icebergtest.testhive1(
. . . . . . . . . . . . . . . . . . . . . . .>   `name` TIMESTAMP WITH LOCAL TIME ZONE)
. . . . . . . . . . . . . . . . . . . . . . .> STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
. . . . . . . . . . . . . . . . . . . . . . .> LOCATION 'hdfs://host-10-1-242-30:8020/tmp/warehouse/icebergtest/testhive1'
. . . . . . . . . . . . . . . . . . . . . . .> TBLPROPERTIES (
. . . . . . . . . . . . . . . . . . . . . . .>   'iceberg.mr.catalog'='hadoop',
. . . . . . . . . . . . . . . . . . . . . . .>   'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://host-10-1-242-30:8020/tmp/warehouse/icebergtest/testhive1'
. . . . . . . . . . . . . . . . . . . . . . .>   );
INFO  : Compiling command(queryId=hive_20220414173624_eec5c9d1-1430-4472-81fd-e7d3e69360df): CREATE EXTERNAL TABLE icebergtest.testhive1(
`name` TIMESTAMP WITH LOCAL TIME ZONE)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://host-10-1-242-30:8020/tmp/warehouse/icebergtest/testhive1'
TBLPROPERTIES (
'iceberg.mr.catalog'='hadoop',
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://host-10-1-242-30:8020/tmp/warehouse/icebergtest/testhive1'
)
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20220414173624_eec5c9d1-1430-4472-81fd-e7d3e69360df); Time taken: 0.281 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20220414173624_eec5c9d1-1430-4472-81fd-e7d3e69360df): CREATE EXTERNAL TABLE icebergtest.testhive1(
`name` TIMESTAMP WITH LOCAL TIME ZONE)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://host-10-1-242-30:8020/tmp/warehouse/icebergtest/testhive1'
TBLPROPERTIES (
'iceberg.mr.catalog'='hadoop',
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://host-10-1-242-30:8020/tmp/warehouse/icebergtest/testhive1'
)
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220414173624_eec5c9d1-1430-4472-81fd-e7d3e69360df); Time taken: 2.866 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
No rows affected (4.898 seconds)

0: jdbc:hive2://host-10-1-242-30:2181,host-10> desc testhive1;
INFO  : Compiling command(queryId=hive_20220414173720_a15a2ad6-9479-45a9-9dc4-f23c4af5de20): desc testhive1
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220414173720_a15a2ad6-9479-45a9-9dc4-f23c4af5de20); Time taken: 0.051 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20220414173720_a15a2ad6-9479-45a9-9dc4-f23c4af5de20): desc testhive1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220414173720_a15a2ad6-9479-45a9-9dc4-f23c4af5de20); Time taken: 0.079 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+-----------+---------------------------------+--------------------+
| col_name  |            data_type            |      comment       |
+-----------+---------------------------------+--------------------+
| name      | timestamp with local time zone  | from deserializer  |
+-----------+---------------------------------+--------------------+
1 row selected (0.172 seconds)

trino:

trino:icebergtest> desc testhive1;
 Column |            Type             | Extra | Comment
--------+-----------------------------+-------+---------
 name   | timestamp(6) with time zone |       |
(1 row)

Query 20220414_093754_00010_ub3iy, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.65 [1 rows, 90B] [1 rows/s, 138B/s]

spark sql:

spark-sql> desc testhive1;
name    timestamp

# Partitioning
Not partitioned
Time taken: 0.252 seconds, Fetched 4 row(s)

after insert: hive:

0: jdbc:hive2://host-10-1-242-30:2181,host-10> desc testhive1;
INFO  : Compiling command(queryId=hive_20220414173930_cb4758ed-968e-402e-9044-5b50caeab22b): desc testhive1
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220414173930_cb4758ed-968e-402e-9044-5b50caeab22b); Time taken: 0.056 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20220414173930_cb4758ed-968e-402e-9044-5b50caeab22b): desc testhive1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220414173930_cb4758ed-968e-402e-9044-5b50caeab22b); Time taken: 0.076 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+-----------+---------------------------------+--------------------+
| col_name  |            data_type            |      comment       |
+-----------+---------------------------------+--------------------+
| name      | timestamp with local time zone  | from deserializer  |
+-----------+---------------------------------+--------------------+
1 row selected (0.164 seconds)

trino:

trino:icebergtest> insert into testhive1 values (timestamp '2020-01-01 01:10:10');
INSERT: 1 row

Query 20220414_093908_00011_ub3iy, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
2.02 [0 rows, 0B] [0 rows/s, 0B/s]

trino:icebergtest> desc testhive1;
 Column |            Type             | Extra | Comment
--------+-----------------------------+-------+---------
 name   | timestamp(6) with time zone |       |
(1 row)

Query 20220414_093913_00012_ub3iy, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.58 [1 rows, 90B] [1 rows/s, 154B/s]

spark sql:

spark-sql> desc testhive1;
name    timestamp

# Partitioning
Not partitioned
Time taken: 0.099 seconds, Fetched 4 row(s)

spark create table then insert data from trino:

before insert: trino:

trino:icebergtest> desc testspark1;
 Column |            Type             | Extra | Comment
--------+-----------------------------+-------+---------
 t      | timestamp(6) with time zone |       |
(1 row)

Query 20220414_094318_00017_ub3iy, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.59 [1 rows, 88B] [1 rows/s, 150B/s]

hive:

0: jdbc:hive2://host-10-1-242-30:2181,host-10> desc testspark1;
INFO  : Compiling command(queryId=hive_20220414174253_5e37d9dd-4959-43a0-a6bb-adfd02930dec): desc testspark1
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220414174253_5e37d9dd-4959-43a0-a6bb-adfd02930dec); Time taken: 0.041 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20220414174253_5e37d9dd-4959-43a0-a6bb-adfd02930dec): desc testspark1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220414174253_5e37d9dd-4959-43a0-a6bb-adfd02930dec); Time taken: 0.021 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+-----------+------------+----------+
| col_name  | data_type  | comment  |
+-----------+------------+----------+
| t         | timestamp  |          |
+-----------+------------+----------+
1 row selected (0.084 seconds)

spark sql:

spark-sql> create table testspark1(t timestamp);
Time taken: 0.458 seconds
spark-sql> desc testspark1;
t   timestamp

# Partitioning
Not partitioned
Time taken: 0.087 seconds, Fetched 4 row(s)

after insert: trino:

trino:icebergtest> insert into testspark1 values (timestamp '2020-01-01 01:10:10');
INSERT: 1 row

Query 20220414_094436_00018_ub3iy, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
1.12 [0 rows, 0B] [0 rows/s, 0B/s]
trino:icebergtest> desc testspark1;
 Column |            Type             | Extra | Comment
--------+-----------------------------+-------+---------
 t      | timestamp(6) with time zone |       |
(1 row)

Query 20220414_094448_00020_ub3iy, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.58 [1 rows, 88B] [1 rows/s, 152B/s]

hive:

0: jdbc:hive2://host-10-1-242-30:2181,host-10> desc testspark1;
INFO  : Compiling command(queryId=hive_20220414174524_2dc345e7-8ce6-4144-ae40-bf56e6152ecf): desc testspark1
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220414174524_2dc345e7-8ce6-4144-ae40-bf56e6152ecf); Time taken: 0.044 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20220414174524_2dc345e7-8ce6-4144-ae40-bf56e6152ecf): desc testspark1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220414174524_2dc345e7-8ce6-4144-ae40-bf56e6152ecf); Time taken: 0.018 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
+-----------+---------------------------------+----------+
| col_name  |            data_type            | comment  |
+-----------+---------------------------------+----------+
| t         | timestamp with local time zone  |          |
+-----------+---------------------------------+----------+
1 row selected (0.087 seconds)

spark sql:

spark-sql> desc testspark1;
t   timestamp

# Partitioning
Not partitioned
Time taken: 0.092 seconds, Fetched 4 row(s)
findepi commented 2 years ago

@zhaoyim please paste code blocks (with triple backticks ```), not images. I cannot copy-paste text from an image into my terminal.

Trino tends perform alter table operation with the thrift table object using the column type name timestamp with local time zone.

@puchengy do you know where does it come from?

Anyway, the timestamp with local time zone is actually correct for "timestamp with zone... but without a zone" (like Java Instant). This is what iceberg timestamptz is. In Hive, the timestamp represents "timestamp without zone" (like Java LocalDateTime). This is not a point in time.

AFAIK, in Spark, the timestamp is a point in time, so it should be declared in metastore as such. Thus, it feels a Spark's fault.

zhaoyim commented 2 years ago

@findepi updated use the text instead of the images, please have a try.

yunrougong commented 11 months ago

is this issue kind of resolved by the Spark 3.4 (+) version where the spark type has timestamp_ntz.