RLI record preparation for MDT has been relying on RDD. We are removing the dependency in this patch to generate it on the fly to ensure RLI updates are resilient to task/stage retries with spark.
Will update the PR desc w/ more details on the design shortly.
Current design of RLI prior to this fix:
Root cause for the potential inconsistencies:
Major point of concern here is that, for preparing MDT records for some of the partitions, we are relying in RDD. All other partitions in MDT is relying on HoodieCommitMetadata which is in driver's memory, but RLI is relying on RDD which could be recomputed again when a subset of spark partition goes missing.
Proposal to fix the inconsistencies:
So, our proposal targets to avoid the reliance on the RDD on a high level. We are choosing consistency over performance for the time. Eventually we wanted to consider a full streaming way of generating MDT records (ref draft patch here) We might come back post 1.0 release to revisit the full dag rewrite for sure, since its more streaming friendly and we have to take that route eventually to support minute level commits w/ numerous indexes being added to our indexing sub system.
Design
Once we trigger DT writes via collect() to fetch List, all of downstream computation will never look up the RDD for any MDT record preparation. For RLI and Secondary index, we will do on-demand read of the data files to fetch the required info to prepare MDT records for these partitions. Here is the illustration of the design.
So, for RLI and Secondary index, we will be doing on-demand read of data files to fetch the necessary info to assist with preparing MDT records. With this design change, our entire record generation for RLI and Secondary index will be resilient to spark task retries. Infact, we trigger the collect() just once for the data table and so, post collecting() the writeStatus/HoodieWriteStats in data table, no downstream caller will ever try to dereference the dag and so there is no chance of inconsistencies.
In this patch, we will focus only in RLI. We will work on a follow up patch for SI.
So, lets try to understand what info do we need and how to fetch it.
For RLI, we need below info
For every file group that got touched, we need the following:
newly inserted record keys ➝ fileGroup ID
delete record keys ➝ fileGroup ID.
For now, lets dive into details on how we plan to get the required info.
How do we fetch the required info for a given file group being touched in the current commit of interest:
a. We need to read the latest image of the file slice added as part of the current commit.
b. Optionally we need to read the previous image of the file slice or previous file slice (excluding the files being added as part of current commit).
Find the difference b/w (a) and (b) to fetch the required info for both indexes.
Reason for (b) being optional is :
If new file being considered, consists of purely of inserts or updates, we don't even need to look up the previous version of the file slice. Note that, HoodieWriteStat will give us the info about numInserts, numUpdates and numDeletes and we can rely on that to deduce it.
Lets understand what refers to "previous image of the file slice":
Essentially its the latest file slice excluding the file being added in the current commit.
Incase of COW table, if a new base file is added to an existing file group, "previous image of the file slice" refers to the previous file slice(i.e the previous base file). This also applies for compaction in case of MOR table.
Degenerative case: Incase of a new base file added to a new file group, there is no "previous image of the file slice". Every record is an insert in this case.
Incase of a new log file added to an existing file slice, "previous image of the file slice" refers to the file slice excluding the log file.
Computing the record key ➝ fileId mapping differs for base file vs log file. Lets take a look at each of them.
Base file record key mapping computation:
RLI:
If adding a new base file for an existing file group, and HoodieWritestats shows any deletes, we might need to look into previous base file as well to find the record key mapping. If not, we just fetch record keys from the new base file (could be insert or could be update and we don't need to differentiate for RLI) and ingest them to RLI.
If its a new base file for a new file group, we read the record keys from the new base file and add them to RLI.
Incase of Compaction, we follow (a). Here, the previous version of file slice will refer to just the previous base file and we don't need to include log files from previous file slice for comparison (just prev base file is good enough)
Log files record key mapping computation:
RLI is an index, where we can't have inserts to log files. So, we can completely ignore reading any log files w/ data blocks only. In other words, log files w/ data blocks can only contain updates. But still, some payload implementation could realize deletes via custom implementation which may be seen as updates. So, to account for those cases, we can let the next compaction take care of realizing the deletes from the file group of interest.
To summarize, if we have log files added to commit of interest:
for RLI since inserts cannot go into log files:
log files w/ data blocks: we can ignore processing them. Whenever compaction kicks in for the file slice of interest, we will ensure deleted records are brought to sync w/ RLI (if not added to delete blocks).
log files / delete blocks: We just read the record keys from the delete log block and generate records for RLI.
Lets skim through every operation:
Regular core operations like bulk_insert, insert, upsert and delete will follow the above logic.
Incase of compaction, we have called out above.
For clustering, everything is a new insert since FG ID changes. (updates RLI)
For insert_overwrite: We fetch the list of replaced files from HoodieReplaceCommitMetadata to find the record keys to be deleted from RLI. Also, for new files that were added, we will do on-demand read of base files and create new insert records to RLI.
For insert overwrite table: we simplified this operation by re-creating the MDT fully.
For Delete partition: Pretty similar to insert_overwrite, just that we may not have any new records to ingest to RLI, we will only delete records from RLI.
Impact
Robust RLI updates even w/ spark task retries.
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
The config description must be updated if new configs are added or the default value of the configs are changed
Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
ticket number here and follow the instruction to make
changes to the website.
Change Logs
RLI record preparation for MDT has been relying on RDD. We are removing the dependency in this patch to generate it on the fly to ensure RLI updates are resilient to task/stage retries with spark.
Will update the PR desc w/ more details on the design shortly.
Current design of RLI prior to this fix:
Root cause for the potential inconsistencies: Major point of concern here is that, for preparing MDT records for some of the partitions, we are relying in RDD. All other partitions in MDT is relying on HoodieCommitMetadata which is in driver's memory, but RLI is relying on RDD which could be recomputed again when a subset of spark partition goes missing.
Proposal to fix the inconsistencies: So, our proposal targets to avoid the reliance on the RDD on a high level. We are choosing consistency over performance for the time. Eventually we wanted to consider a full streaming way of generating MDT records (ref draft patch here) We might come back post 1.0 release to revisit the full dag rewrite for sure, since its more streaming friendly and we have to take that route eventually to support minute level commits w/ numerous indexes being added to our indexing sub system.
Design Once we trigger DT writes via collect() to fetch List, all of downstream computation will never look up the RDD for any MDT record preparation. For RLI and Secondary index, we will do on-demand read of the data files to fetch the required info to prepare MDT records for these partitions. Here is the illustration of the design.
So, for RLI and Secondary index, we will be doing on-demand read of data files to fetch the necessary info to assist with preparing MDT records. With this design change, our entire record generation for RLI and Secondary index will be resilient to spark task retries. Infact, we trigger the collect() just once for the data table and so, post collecting() the writeStatus/HoodieWriteStats in data table, no downstream caller will ever try to dereference the dag and so there is no chance of inconsistencies.
In this patch, we will focus only in RLI. We will work on a follow up patch for SI.
So, lets try to understand what info do we need and how to fetch it.
For RLI, we need below info For every file group that got touched, we need the following:
For now, lets dive into details on how we plan to get the required info.
How do we fetch the required info for a given file group being touched in the current commit of interest: a. We need to read the latest image of the file slice added as part of the current commit. b. Optionally we need to read the previous image of the file slice or previous file slice (excluding the files being added as part of current commit). Find the difference b/w (a) and (b) to fetch the required info for both indexes.
Reason for (b) being optional is : If new file being considered, consists of purely of inserts or updates, we don't even need to look up the previous version of the file slice. Note that, HoodieWriteStat will give us the info about numInserts, numUpdates and numDeletes and we can rely on that to deduce it.
Lets understand what refers to "previous image of the file slice": Essentially its the latest file slice excluding the file being added in the current commit. Incase of COW table, if a new base file is added to an existing file group, "previous image of the file slice" refers to the previous file slice(i.e the previous base file). This also applies for compaction in case of MOR table. Degenerative case: Incase of a new base file added to a new file group, there is no "previous image of the file slice". Every record is an insert in this case. Incase of a new log file added to an existing file slice, "previous image of the file slice" refers to the file slice excluding the log file.
Computing the record key ➝ fileId mapping differs for base file vs log file. Lets take a look at each of them.
Base file record key mapping computation: RLI:
Log files record key mapping computation: RLI is an index, where we can't have inserts to log files. So, we can completely ignore reading any log files w/ data blocks only. In other words, log files w/ data blocks can only contain updates. But still, some payload implementation could realize deletes via custom implementation which may be seen as updates. So, to account for those cases, we can let the next compaction take care of realizing the deletes from the file group of interest.
To summarize, if we have log files added to commit of interest: for RLI since inserts cannot go into log files:
Lets skim through every operation:
Impact
Robust RLI updates even w/ spark task retries.
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
Contributor's checklist