apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] Hive Sync Tool parses timestamp field as bigint in Hive metastore #7730

Open Ericliu249 opened 1 year ago

Ericliu249 commented 1 year ago

Describe the problem you faced We use Flink to ingest data into Hudi and the Hive Sync Tool to sync metadata to our Hive metastore. A field was declared as Timestamp type when writing to Hudi by Flink. The type of the field in the hoodie.properties is showed as long type and timestamp-micros logicalType. When we checking in Hive metastore backend db, we can see the TYPE_NAME for that field is stored as bigint. Verified that by querying with Trino (returned numbers like 1595365357402000 instead of 2020-07-22 16:46:11.038 +00:00 format in the parquet files).

We tried add 'hoodie.datasource.write.hive_style_partitioning' = 'true' and 'hoodie.datasource.hive_sync.support_timestamp' = 'true' configs but didn't work.

A similar issue to https://github.com/apache/hudi/issues/2509

Expected behavior

The data type in Hive metastore is timestamp when Flink writes a Timestamp field to Hudi.

Environment Description

Additional context

The timestamp field comes from the Kafka is in the format like 2020-07-22T16:46:11.038532Z with string data type. We used the TO_TIMESTAMP(replace(replace(updated, 'T', ' '), 'Z' ,'' )) expression to convert it from string the timestamp. The parquet stores the timestamp field in the format like 2020-07-22 16:46:11.038 +00:00. In hoodie.properies, the field's type is:

            "type": [
                "null",
                {
                    "type": "long",
                    "logicalType": "timestamp-micros"
                }
            ]

In Hive metastore: image

Hudi configs in Flink job

        'table.type' = 'COPY_ON_WRITE', 
        'changelog.enabled' = 'true',           
        'compaction.async.enabled' = 'true', 
        'compaction.schedule.enabled' = 'true',
        'compaction.trigger.strategy' = 'num_or_time',
        'compaction.delta_commits' = '10',
        'compaction.delta_seconds' = '3600', 
        'write.insert.cluster' = 'true',
        'write.task.max.size' = '2014',
        'write.merge.max_memory' = '1024', 
        'write.tasks' = '18',
        'write.rate.limit' = '50',
        'metadata.enabled' = 'true', 
        'hoodie.datasource.write.recordkey.field' = 'user_id,offer_id,receipt_id',
        'hoodie.datasource.write.precombine.field' = 'updated', 
        'hoodie.datasource.write.partitionpath.field' = 'offer_id',
        'hoodie.datasource.write.hive_style_partitioning' = 'true',
        'hoodie.datasource.hive_sync.support_timestamp' = 'true',
        'hive_sync.support_timestamp' = 'true',
        'hoodie.index.type' = 'SIMPLE',
        'hoodie.compact.inline.max.delta.seconds' = '3600',
        'hoodie.write.markers.type' = 'timeline_server_based',
        'hoodie.clean.async' = 'true',
        'hoodie.cleaner.policy.failed.writes' = 'LAZY',
        'hoodie.filesystem.operation.retry.enable' = 'true',
        'hoodie.filesystem.operation.retry.initial_interval_ms' = '100',
        'hoodie.filesystem.operation.retry.max_interval_ms' = '2000',
        'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',
        'hoodie.write.lock.provider' = 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
        'hoodie.write.lock.wait_time_ms' = '60000',
        'hoodie.write.lock.num_retries' = '15',
        'hoodie.write.lock.client.wait_time_ms' = '10000',
        'hoodie.write.lock.client.num_retries' = '10',
        'hoodie.write.lock.dynamodb.table' = 'dev_hudi_lock',
        'hoodie.write.lock.dynamodb.partition_key' = 'offer_usages',
        'hoodie.write.lock.dynamodb.region' = 'us-east-1',
        'hoodie.write.lock.dynamodb.billing_mode' = 'PAY_PER_REQUEST',
        'hoodie.write.lock.dynamodb.endpoint_url' = 'dynamodb.us-east-1.amazonaws.com'

Stacktrace

│ 2023-01-22 23:02:10,479 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(60)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable                                                           │
│ 2023-01-22 23:02:10,662 INFO  [main] impl.MetricsConfig (MetricsConfig.java:loadFirst(120)) - Loaded properties from hadoop-metrics2.properties                                                                                                                   │
│ 2023-01-22 23:02:10,753 INFO  [main] impl.MetricsSystemImpl (MetricsSystemImpl.java:startTimer(378)) - Scheduled Metric snapshot period at 10 second(s).                                                                                                          │
│ 2023-01-22 23:02:10,753 INFO  [main] impl.MetricsSystemImpl (MetricsSystemImpl.java:start(191)) - s3a-file-system metrics system started                                                                                                                          │
│ 2023-01-22 23:02:11,536 INFO  [main] conf.HiveConf (HiveConf.java:findConfigFile(187)) - Found configuration file file:/usr/hive/conf/hive-site.xml                                                                                                               │
│ 2023-01-22 23:02:11,714 INFO  [main] table.HoodieTableMetaClient (HoodieTableMetaClient.java:<init>(117)) - Loading HoodieTableMetaClient from s3a://lakehouse-flink-kafka-hudi-dev/hudi/offer-usages/data/                                                       │
│ 2023-01-22 23:02:11,780 INFO  [main] table.HoodieTableConfig (HoodieTableConfig.java:<init>(242)) - Loading table properties from s3a://lakehouse-flink-kafka-hudi-dev/hudi/offer-usages/data/.hoodie/hoodie.properties                                           │
│ 2023-01-22 23:02:11,816 INFO  [main] table.HoodieTableMetaClient (HoodieTableMetaClient.java:<init>(136)) - Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://lakehouse-flink-kafka-hudi-dev/hudi/offer-usages/data/     │
│ 2023-01-22 23:02:11,816 INFO  [main] table.HoodieTableMetaClient (HoodieTableMetaClient.java:<init>(139)) - Loading Active commit timeline for s3a://lakehouse-flink-kafka-hudi-dev/hudi/offer-usages/data/                                                       │
│ 2023-01-22 23:02:11,889 INFO  [main] timeline.HoodieActiveTimeline (HoodieActiveTimeline.java:<init>(129)) - Loaded instants upto : Option{val=[==>20230122225740004__commit__INFLIGHT]}                                                                          │
│ 2023-01-22 23:02:12,269 INFO  [main] metastore.HiveMetaStoreClient (HiveMetaStoreClient.java:open(441)) - Trying to connect to metastore with URI thrift://hive-metastore.hive-metastore.svc.cluster.local:9083                                                   │
│ 2023-01-22 23:02:12,286 INFO  [main] metastore.HiveMetaStoreClient (HiveMetaStoreClient.java:open(517)) - Opened a connection to metastore, current connections: 1                                                                                                │
│ 2023-01-22 23:02:12,302 INFO  [main] metastore.HiveMetaStoreClient (HiveMetaStoreClient.java:open(570)) - Connected to metastore.                                                                                                                                 │
│ 2023-01-22 23:02:12,302 INFO  [main] metastore.RetryingMetaStoreClient (RetryingMetaStoreClient.java:<init>(97)) - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=root (auth:SIMPLE) retries=1 delay=1 lif │
│ 2023-01-22 23:02:12,416 INFO  [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(137)) - Syncing target hoodie table with hive table(offer_usages). Hive metastore URL :jdbc:hive2://localhost:10000, basePath :s3a://lakehouse-flink-kafka-hudi-dev/hud │
│ 2023-01-22 23:02:12,416 INFO  [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(179)) - Trying to sync hoodie table offer_usages with base path s3a://lakehouse-flink-kafka-hudi-dev/hudi/offer-usages/data of type COPY_ON_WRITE                       │
│ 2023-01-22 23:02:12,748 INFO  [main] table.TableSchemaResolver (TableSchemaResolver.java:readSchemaFromParquetBaseFile(395)) - Reading schema from s3a://lakehouse-flink-kafka-hudi-dev/hudi/offer-usages/data/offer_id=5e98960307de32145d297505/4b63c6f7-bf57-43 │
│ 2023-01-22 23:02:12,830 INFO  [main] s3a.S3AInputStream (S3AInputStream.java:seekInStream(304)) - Switching to Random IO seek policy                                                                                                                              │
│ 2023-01-22 23:02:13,125 INFO  [main] ddl.HMSDDLExecutor (HMSDDLExecutor.java:getTableSchema(172)) - Time taken to getTableSchema: 23 ms                                                                                                                           │
│ 2023-01-22 23:02:13,130 INFO  [main] metastore.HiveMetaStoreClient (HiveMetaStoreClient.java:close(600)) - Closed a connection to metastore, current connections: 0                                                                                               │
│ Exception in thread "main" org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing offer_usages                                                                                                                                        │
│     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:143)                                                                                                                                                                                   │
│     at org.apache.hudi.hive.HiveSyncTool.main(HiveSyncTool.java:435)                                                                                                                                                                                              │
│ Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Could not convert field Type from TIMESTAMP to bigint for field updated_ts                                                                                                                               │
│     at org.apache.hudi.hive.util.HiveSchemaUtil.getSchemaDifference(HiveSchemaUtil.java:103)                                                                                                                                                                      │
│     at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:284)                                                                                                                                                                                        │
│     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:219)                                                                                                                                                                                   │
│     at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:152)                                                                                                                                                                                            │
│     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:140)                                                                                                                                                                                   │
│     ... 1 more                                                                                                                                                                                                                                                    │
│ 2023-01-22 23:02:13,134 INFO  [shutdown-hook-0] impl.MetricsSystemImpl (MetricsSystemImpl.java:stop(210)) - Stopping s3a-file-system metrics system...                                                                                                            │
│ 2023-01-22 23:02:13,134 INFO  [shutdown-hook-0] impl.MetricsSystemImpl (MetricsSystemImpl.java:stop(216)) - s3a-file-system metrics system stopped.                                                                                                               │
│ 2023-01-22 23:02:13,134 INFO  [shutdown-hook-0] impl.MetricsSystemImpl (MetricsSystemImpl.java:shutdown(611)) - s3a-file-system metrics system shutdown complete
danny0405 commented 1 year ago

cc @lokeshj1703 , can you take a look, seems another duplciate issue: #7724

archibald-nice commented 1 year ago

@danny0405 @lokeshj1703 I had the same problem when using Flink ingest data(one MySQL table) to Hudi with syncing metadata to Hive. Env versions as follows:

I have tried the methods like setting param 'hive_sync.support_timestamp' = 'true' and indicating the column update_time type as timestamp(6), but it still shows up as a bigint value when querying from Hive-Cli.

Expecting a reply 😀

codope commented 1 year ago

There is a known issue with timestamp type. Would suggest to try out #3391

ad1happy2go commented 1 year ago

@Ericliu249 Were you able to try out the suggested path. Are you still facing this issue?