apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.12k stars 839 forks source link

[WIP][spark] Support timestamp_ntz #3569

Open Zouxxyy opened 1 week ago

Zouxxyy commented 1 week ago

Purpose

Spark support timestamp_ntz since spark3.4, paimon can support it. And there were problems with timestamp between paimon and spark types before, the following is the correct result after modification

Spark Data Type Paimon Data Type Atomic Type
TimestampType LocalZonedTimestamp true
TimestampNTZType (spark3.4+) TimestampType true

Tests

D select name, type, converted_type, logical_type from parquet_schema('test_hive.db/paimon_tbl/bucket-0/data-fc7c66db-0126-4709-b44a-3243ba06660e-0.parquet'); ┌───────────────┬─────────┬──────────────────┬─────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ name │ type │ converted_type │ logical_type │ │ varchar │ varchar │ varchar │ varchar │ ├───────────────┼─────────┼──────────────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────┤ │ paimon_schema │ │ │ │ │ id │ INT32 │ │ │ │ ts1 │ INT64 │ TIMESTAMP_MICROS │ TimestampType(isAdjustedToUTC=1, unit=TimeUnit(MILLIS=, MICROS=MicroSeconds(), NANOS=)) │ │ ts2 │ INT64 │ TIMESTAMP_MICROS │ TimestampType(isAdjustedToUTC=0, unit=TimeUnit(MILLIS=, MICROS=MicroSeconds(), NANOS=)) │ └───────────────┴─────────┴──────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────┘ D select name, type, converted_type, logical_type from parquet_schema('test_hive.db/parquet_tbl/part-00000-d5360ba2-6291-429f-9c0c-57f3ef8bfd82-c000.snappy.parquet'); ┌──────────────┬─────────┬──────────────────┬─────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ name │ type │ converted_type │ logical_type │ │ varchar │ varchar │ varchar │ varchar │ ├──────────────┼─────────┼──────────────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────┤ │ spark_schema │ │ │ │ │ id │ INT32 │ │ │ │ ts1 │ INT64 │ TIMESTAMP_MICROS │ TimestampType(isAdjustedToUTC=1, unit=TimeUnit(MILLIS=, MICROS=MicroSeconds(), NANOS=)) │ │ ts2 │ INT64 │ TIMESTAMP_MICROS │ TimestampType(isAdjustedToUTC=0, unit=TimeUnit(MILLIS=, MICROS=MicroSeconds(), NANOS=)) │ └──────────────┴─────────┴──────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────┘


write paimon orc and read with orc-utils

orc-tools meta data-84a39541-1165-47f9-adba-478e193a469e-0.orc Structure for data-84a39541-1165-47f9-adba-478e193a469e-0.orc File Version: 0.12 with ORC_14 by ORC Java 1.9.2 Rows: 1 Compression: ZSTD Compression size: 262144 Calendar: Julian/Gregorian Type: struct<id:int,binary:binary,ts:timestamp with local time zone,ts_ntz:timestamp>



### Compatibility (WARNING!!)

For the previous written paimon table, since its paimon schema is `TimestampType`, spark will interprete it as `TimestampNTZType` and read it correctly (I had copy `TimestampNTZType` to  paimon-spark3.2 and paimon-spark3.3).
I tested read old paimon ts table manually, there is no compatibility issue and can be read and write correctly, for god sake...

For the new table after this PR, everything goes well, timestamp will be correctly parsed as `TimestampType` while timestamp_ntz wil be parsed as `TimestampNTZType`.

### API and Format

<!-- Does this change affect API or storage format -->

### Documentation