apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] The issue of Hive metadata synchronization delay. #11714

Open Toroidals opened 2 months ago

Toroidals commented 2 months ago

Tips before filing an issue

Describe the problem you faced

The issue of Hive metadata synchronization delay.

Flink employs the upsert method to write into Hudi, which synchronizes in real-time to Hive. Both Hive and SparkSQL are used to query the read-optimized (RO) table of the MOR (Merge-On-Read) table. However, the data queried by Spark is significantly more and appears to be more up-to-date than that queried by Hive.

options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field")); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets")); options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type")); options.put(FlinkOptions.COMPACTION_TASKS.key(), compactionTasks); options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy")); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits")); options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds")); options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true");

options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled")); options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode")); options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db")); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table")); options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf"); options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url")); options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url")); options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), infoMap.get("hudi_hive_sync_partition_fields")); options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), infoMap.get("hudi_hive_sync_partition_fields")); options.put(FlinkOptions.WRITE_TASKS.key(), writeTasks); options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());

2024-07-30 17:08:27,609 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 0 2024-07-30 17:08:27,730 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx01_cdc 2024-07-30 17:08:27,730 INFO org.apache.hudi.hive.HiveSyncTool [] - Trying to sync hoodie table ods_xxx01_cdc_rt with base path hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx01_cdc of type MERGE_ON_READ 2024-07-30 17:08:27,864 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - Time taken to getTableSchema: 30 ms 2024-07-30 17:08:27,866 INFO org.apache.hudi.hive.HiveSyncTool [] - No Schema difference for ods_xxx01_cdc_rt 2024-07-30 17:08:27,954 INFO org.apache.hudi.hive.HiveSyncTool [] - Last commit time synced was found to be 20240730170330152, last commit completion time is found to be 20240730170543129 2024-07-30 17:08:27,978 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx02_cdc_rt 2024-07-30 17:08:27,978 INFO org.apache.hudi.hive.HiveSyncTool [] - Trying to sync hoodie table ods_xxx02_cdc with base path hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx02_cdc of type MERGE_ON_READ 2024-07-30 17:08:27,982 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 0 2024-07-30 17:08:28,130 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - Time taken to getTableSchema: 32 ms 2024-07-30 17:08:28,132 INFO org.apache.hudi.hive.HiveSyncTool [] - No Schema difference for ods_xxx02_cdc 2024-07-30 17:08:28,236 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx03_cdc_rt 2024-07-30 17:08:28,236 INFO org.apache.hudi.hive.HiveSyncTool [] - Trying to sync hoodie table ods_xxx03_cdc with base path hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx03_cdc of type MERGE_ON_READ 2024-07-30 17:08:28,285 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx04_cdc_rt 2024-07-30 17:08:28,285 INFO org.apache.hudi.hive.HiveSyncTool [] - Trying to sync hoodie table ods_xxx04_cdc with base path hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx04_cdc of type MERGE_ON_READ 2024-07-30 17:08:28,388 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - Time taken to getTableSchema: 31 ms 2024-07-30 17:08:28,388 INFO org.apache.hudi.hive.HiveSyncTool [] - No Schema difference for ods_xxx03_cdc 2024-07-30 17:08:28,397 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - Time taken to getTableSchema: 27 ms 2024-07-30 17:08:28,397 INFO org.apache.hudi.hive.HiveSyncTool [] - No Schema difference for ods_xxx04_cdc 2024-07-30 17:08:28,596 INFO org.apache.hudi.hive.HiveSyncTool [] - Last commit time synced was found to be 20240730170543850, last commit completion time is found to be 20240730170824687 2024-07-30 17:08:28,596 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 0 2024-07-30 17:08:28,677 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx01_cdc_rt 2024-07-30 17:08:28,677 INFO org.apache.hudi.hive.HiveSyncTool [] - Trying to sync hoodie table ods_xxx01_cdc with base path hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx01_cdc of type MERGE_ON_READ 2024-07-30 17:08:28,788 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - Time taken to getTableSchema: 23 ms 2024-07-30 17:08:28,792 INFO org.apache.hudi.hive.HiveSyncTool [] - No Schema difference for ods_xxx01_cdc 2024-07-30 17:08:28,812 INFO org.apache.hudi.hive.HiveSyncTool [] - Last commit time synced was found to be 20240730170543728, last commit completion time is found to be 20240730170824687 2024-07-30 17:08:28,813 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 0 2024-07-30 17:08:28,825 INFO org.apache.hudi.hive.HiveSyncTool [] - Last commit time synced was found to be 20240730170543314, last commit completion time is found to be 20240730170824746 2024-07-30 17:08:28,826 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 0 2024-07-30 17:08:29,003 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx02_cdc 2024-07-30 17:08:29,375 INFO org.apache.hudi.hive.HiveSyncTool [] - Last commit time synced was found to be 20240730170544219, last commit completion time is found to be 20240730170824789 2024-07-30 17:08:29,376 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 0 2024-07-30 17:08:29,455 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx03_cdc 2024-07-30 17:08:29,464 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx04_cdc 2024-07-30 17:08:29,857 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx05_cdc_rt 2024-07-30 17:08:29,857 INFO org.apache.hudi.hive.HiveSyncTool [] - Trying to sync hoodie table ods_xxx05_cdc with base path hdfs:/apps/hive/warehouse/hudi.db/hudi_xxx05_cdc of type MERGE_ON_READ 2024-07-30 17:08:30,071 INFO org.apache.hudi.hive.ddl.HMSDDLExecutor [] - Time taken to getTableSchema: 32 ms 2024-07-30 17:08:30,072 INFO org.apache.hudi.hive.HiveSyncTool [] - No Schema difference for ods_xxx05_cdc 2024-07-30 17:08:30,350 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx01_cdc 2024-07-30 17:08:30,552 INFO org.apache.hudi.hive.HiveSyncTool [] - Last commit time synced was found to be 20240730170545190, last commit completion time is found to be 20240730170824687 2024-07-30 17:08:30,552 INFO org.apache.hudi.hive.HiveSyncTool [] - Storage partitions scan complete. Found 0 2024-07-30 17:08:31,623 INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for ods_xxx05_cdc

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

ad1happy2go commented 2 months ago

@Toroidals Can you share code what are you using to read using spark. By default it uses the snapshot view. OR are you trying to read _ro itself and not using path.

Both spark and hive will read parquet files only so ideally should give similar performance for RO.

Toroidals commented 2 months ago

@Toroidals Can you share code what are you using to read using spark. By default it uses the snapshot view. OR are you trying to read _ro itself and not using path.

Both spark and hive will read parquet files only so ideally should give similar performance for RO.

Hive uses HiveServer2 to read the RO table, and Spark uses Spark SQL to read the RO table, using the same query SQL. For example: "select * from table_name_01_ro order by last_update_date desc". When querying, Spark SQL can retrieve the last_update_date values up to 5 minutes before the current time, but Hive can only retrieve the last_update_date values up to 2 hours before the current time.

Phenomenon: Spark SQL queries the RO table with a delay of around 5 minutes, while HiveServer2 queries the RO table with a delay of around 2 hours.

Toroidals commented 2 months ago

hive query result: ![Uploading image.png…]()

Toroidals commented 2 months ago

spark sql query result ![Uploading image.png…]()