apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Flink-Hudi - Upsert into the same Hudi table via two different Flink pipelines (stream and batch) #10914

Closed ChiehFu closed 3 months ago

ChiehFu commented 4 months ago

Describe the problem you faced

Hi,

My team wants to build Flink pipelines to generate financial report and save the report results into a Hudi COW table.

The data sources for the report consist of two types of data - snapshot and incremental data. To have a complete report we need to ingest both snapshot and incremental data, and for that we are thinking about running two Flink jobs against the same Hudi table - batch and incremental - sequentially where the batch job processes all snapshot data up to the current time, and the stream job continuously processes new incremental data.

According to Hudi's documentation, it uses Flink state to store index information for the records it has processed and relies on that information to perform upsert correctly. My question is that the stream job doesn't have access to the state information of the batch job, would Hudi in the stream job be able to perform upsert operations to update records that were previously ingested via the batch job correctly? If not, do you have any recommendation on how we can set up the Flink-Hudi workflow to meet our use case?

To Reproduce

Environment Description

danny0405 commented 4 months ago

You can use the bulk_insert for history data and regular upsert for incrmental streaming ingestion. Note that when you choose flink_state index instead of bucket index, the incremental streaming pipeline needs to enable the index bootstrap through index.bootstrap.enabled option.

ChiehFu commented 3 months ago

@danny0405 thanks for replying.

I tried enabling index.bootstrap.enabled and noticed that there was a index_bootstrap task created in my Flink pipeline accordingly. Initially I thought the boot_strap task would complete and reach the finished state after indexing the existing table, but it continued to run without releasing its resource. Is this an expected behavior that the bootstrap task will run continuously?

danny0405 commented 3 months ago

Is this an expected behavior that the bootstrap task will run continuously?

yes, and after a checkpoint succeed, you can disable the bootstrap by setting up index.bootstrap.enabled as false.

ChiehFu commented 3 months ago

Would the steps be like: checkpoint succeeds -> cancel the Flink job -> restart the Flink job from the checkpoint with bootstrap disabled?

Currently I am running Flink job via Flink SQL on Zeppelin notebook, do you know how I can perform the steps above?

danny0405 commented 3 months ago

Currently I am running Flink job via Flink SQL on Zeppelin notebook

You can disable with Flink SQL hint: /*+ OPTIONS('index.bootstrap.enabled'='false')*/.

ChiehFu commented 3 months ago

@danny0405 Thanks for the answer. The SQL hint worked for me for disabling index bootstrap.

danny0405 commented 3 months ago

Close it now, feel free to reopen it if you still think it is an issue.

ChiehFu commented 3 months ago

@danny0405 I got some follow-up questions. Say I run the following steps to set up my data pipeline

  1. Run a batch job 1 to bulk_insert historical data into a Hudi table
  2. Run a flink stream job 2 with index bootstrap enabled and terminate the job after a checkpoint succeeded
  3. Run a flink stream job 3 with index bootstrap disabled restoring from the checkpoint job 2 created

My questions are

danny0405 commented 3 months ago

Would the checkpoint of job 3 contains all index information retrieved from the index bootstrap process in job 2? Asking this as I noticed a significant size differences between the checkpoint of job 2 and job 3. (500GB in job 2 vs < 50GB in job 3)

yes, one successful checkpoint indicates the bootstrap has finished.

If job 3 fails and I need to start a job 4 using job 3's latest checkpoint, do I need to have index bootstrap enabled?

No need to do that.

BTW, if your dataset is large, BUCKET index is more preferrable.

ChiehFu commented 3 months ago

@danny0405

Is it expected that the checkpoint size of bucket_assigner operator changes significantly from 500GB in the job 2 to less than 50GB in the job 3 mentioned above?

The Hudi sink table has 9.6 billion records with around 570.3 GB, and is currently partition by a time attribute so the size of partition is expected to dynamically grow to store data for a even data distribution. Would my use case benefited by using Bucket index compared to Flink State index? Also do I need to enable index bootstrap task if switching to Bucket index?

ChiehFu commented 3 months ago

In addition, I found some duplicates written by my bulk_insert batch job 1 and upsert stream job 2 (the one that had index bootstrap enabled).

For bulk_insert batch job, it had write.precombine set to true so there shouldn't be any duplicates in the result table?

For upsert stream job, it had write.precombine set to true and index bootstrap task had parallelism set to 480. I found this previous issue https://github.com/apache/hudi/issues/4881 which suggests duplicates can happen when index bootstrap task parallelism > 1. Is that still the case in Hudi 0.14.1? The table that needs to be index bootstrapped is large so I am not sure if setting parallelism to 1 would work.

danny0405 commented 3 months ago

You may need to read this doc first: https://www.yuque.com/yuzhao-my9fz/kb/flqll8?