Open nsivabalan opened 1 week ago
High-level it seemes we introduce many MDT specific APIs and components that I don't like just to avoid the Spark DAG retries, let's try to avoid this. Also think through the effect to other engines like Flink and Java.
@danny0405 Code/APIs aside.
Actually this is a general idea, not specific to Spark retry handling. I think this design is lot more friendly to pure streaming engines like Flink. Don't you think? otherwise how'd we update larger MT partitions like RLI from Flink.
@danny0405 Code/APIs aside.
Actually this is a general idea, not specific to Spark retry handling. I think this design is lot more friendly to pure streaming engines like Flink. Don't you think? otherwise how'd we update larger MT partitions like RLI from Flink.
I didn't see there is any gains after this change for Flink, still there is no chance to plug in the Flink RLI operators on the fly flexibly. Actually the union of the DT/MDT rdds make the pipeline even more complex.
Change Logs
Re-writing writes DAG to write to both DT and MDT using same stage boundaries. This will avoid any out of sync issues that might crop up which are needing special handling as of now. The intention behind this dag rewrite is to ensure we write to both DT and MDT table using single dag w/o any breaks inbetween.
This is a WIP patch which might get split into multiple patches depending on feedback.
Before we go into new DAG, lets revisit how the current dag looks like. Here is a pictorial representation of how the write dag looks like as of today.
Given the current dag, we feel we could do a streaming write to MDT directly relying on writes to Datatable w/o triggering the actions. So, incase of task retries or stage retries, our marker reconciliation will automatically take care of reconciling any spurious data files.
With that in mind, here is what the proposal looks like for the new dag.
I am opening up this patch to get feedback on the design while we try to iterate and get to full blown implementation.
Lets go over one piece at a time.
1. Enabling NBCC for MT
We are enabling NBCC(Non Blocking Concurrency Control) and multi-writer to MDT to account for multiple writers to write concurrently to Metadata table. This is a pre-requisite since data table could have multiple writers and each of the dag could be running concurrently. Previous dag does not need this necessity, but the redesigned dag need to have NBCC with MDT. Draft patch: https://github.com/apache/hudi/pull/12209. changes have been incorporated in this patch as well.
2. Removing auto commit flow to reduce complexity.
In general we have two flows wrt write commits, namely auto commit enabled and auto commit disabled flow. We are unifying this and we plan to support only auto commit disabled flow. All user facing writers (batch writers and streaming writers) are using auto commit disabled flow and so this should not have any impact to end users.
Just that lot of tests are written using auto commit enabled flow and those need to be fixed to use auto commit disabled flow. draft patch: https://github.com/apache/hudi/pull/12204. Again changes have been incorporated in this patch as well.
Auto commit enabled flow
Auto commit disabled flow
3. MDT Writer instance lifecycle:
We need one instance of MDT writer and MDT write client per ingestion commit/table service commit in the data table w/ the new dag design. So, we are introducing a Map of instantTime -> HoodieMetadataTableWriter in the BaseHoodieWriteClient. Expectation is that, a given commit in the data table will instantiate a new HoodieTableMetadataWriter and use it throughout the lifecycle of the commit. In the end, the HTMW will be closed while wrapping up the commit in data table. Ref: https://github.com/apache/hudi/pull/12236/files#diff-e44115f0edf69ae17a0249dc1c90e99211f6d9dd072a0e769668c6dd9fe68638 and https://github.com/apache/hudi/pull/12236/files#diff-14df5e8357e75ba5e6f6e0144a932b70382245f5b70ca8abdcab35fbb4aa43a3
Incase of non-ingestion commits like clean, rollback and restore, we use the old way. Where we instantiate a new HoodieTableMetadataWriter and apply the changes and close it right away. No changes for these actions.
4. Notes on ingestion writes dag.
Lets go over upsert operation in data table.
After we write to data table, we have an RDD
We have introduced new apis in HoodieTableMetadataWriter (prepareAndWriteToMDT) to write to MDT directly based on RDD from data table.
We wanted to keep the FILES partition out of this write so that we can write in finally in the end after reconciling the commit metadata for data table. So, every other partition or index in Metadata table gets written using this api.
This method(prepareAndWriteToMDT) will return metadataTable's RDD.
We stitch both writeStatus' and send it back. So, WriteClient.upsert() will return a RDD which has a mix of data table write status and metadata table write status.
btw, do remember that the dag is not yet triggered next api (c) is called. In other words, just by calling writeClient.upsert(), even data files to data table will not be written.
Ref code to review: https://github.com/apache/hudi/pull/12236/files#r1839032523 https://github.com/apache/hudi/pull/12236/files#r1839024169 https://github.com/apache/hudi/pull/12236/files#r1839020184 https://github.com/apache/hudi/pull/12236/files#r1839025342
c. User calls writeClient.commit(commitTime, return value from (b) above) Lets unpack, what happens within this call.
What this api does is:
Using HoodieCommitMetadata from data table, we prepare and write to FILES partition in MDT.
Stitch List from c.i (i.e. partialMdtWriteStats) and List from writes to FILES partition above and complete the commit to MetadataTable. This means, that we would have performed marker reconciliation for Metadata table as well as part of this step. i.e. delete any spurious files in MDT.
v. Wrap up the commit in Data table.
Please checkout changes in SparkRDDWriteClient, HoodieTableMetadataWriter, HoodieBackedTableMetadataWriter and SparkHoodieBackedTableMetadataWriter.
In this patch, we have fixed upsert() operation to test this dag and it works as expected. i.e. writes to both data table and metadata table happens within a single dag w/o any breaks. writes to FILES partition in MDT happens in the end and finally we wrap up the commit in both metadata table and data table.
Ref code: https://github.com/apache/hudi/pull/12236/files#r1839037534 https://github.com/apache/hudi/pull/12236/files#r1839039325 https://github.com/apache/hudi/pull/12236/files#r1839040074
5. Metadata Partitioner:
One tricky part to achieve above is to design the metadata table partitioner. If we use the out of the box UpsertPartitioner, the workload profile building stage will trigger the action. So, here is what we have done to circumvent that dag trigger. While initializing the HoodieTableMetadataWriter itself, we will know what partitions in MDT is enabled and file group count for the same. So, we use that info to build SparkMetadataUpsertPartitioner. All records are expected to be tagged w/ the fileID location by the time we reach the metadata table upsertPrepped call. So, we leverage that to decide the spark partitioner. By this trick, we completely avoid triggering the dag and keep it streaming from data table writes all the way to metadata table writes.
Ref code: https://github.com/apache/hudi/pull/12236/files#diff-c13aba8c32dad9b38d5b82bd5f6c99c26b932d79d89e42ef83c8206b5e3005db
6. UpsertPreppedPartial:
Based on the dag re-design, we are writing to Metadata table twice using the same delta commit time. So, none of our apis in writeClient are designed to work that way. So, we are introducing upsertPreppedPartial to assist in this case. We have validations in place to ensure this is used only for metadata table writes. So, its feasible to call writeClient.startCommit(t10.dc), writeClient.upsertPreppedPartial(batch1), writeClient.upsertPreppedPartial(batch2) and finally writeClient.commit(t10.dc..)
Ref code: https://github.com/apache/hudi/pull/12236/files#r1839046857
7. New MT Upsert Commit Action Executor
We have introduced SparkMetadataTableUpsertCommitActionExecutor to assist w/ writing to Metadata table. This will receive RDD, creates an empty inflight file (empty workload profile), use SparkMetadataUpsertPartitioner to repartition records, and write to them.
Ref code: https://github.com/apache/hudi/pull/12236/files#diff-4bc75096f7c67d1b0e302b4d2be9e01702db698821c6dfe8c7a28c1614b6f89f
8. Zooming in on prepareAndWriteToMDT Impl:
High level steps unpacked in this method impl is given below:
We have introduced MetadataIndexGenerator and SparkMetadataIndexGenerator to assist with preparing MDT records.
Note: We will be fixing all other write operations (bulk insert, insert, delete, insert overwrite, etc) in a similar fashion. In this patch, upsert() is implemented as a reference.
Ref code: https://github.com/apache/hudi/pull/12236/files#diff-4bad9395622cd04b5332d2dd195b7a407487cfcd43ed421f74c74ce1ead564b9 https://github.com/apache/hudi/pull/12236/files#diff-1f59d767b16a65c15c2795a81bbb46edf03b2c41152339259582cdf40059bdc8
11. Metadata table rollbacks:
Prior to this design, clean failed rollback policy was eager in MDT. So, whenever we start a new commit in MDT, if there are any pending commits, we auto rollback it. But w/ NBCC, clean failed rollback policy is lazy. So, this means that heart beat will be emitted by the mdt writeClient when the delta commit starts. Lazily if the processed crashes, later when cleaner in MDT executes, it will check for failed writes (elapsed heart beats) and trigger rollback. With the dag re-design, we can't let this happen. So, we are disabling this rollback by the cleaner for Metadata table. Any failed write in Metadata table will have a failed write in DataTable as well. So, data table will have to trigger rollback at somepoint (based on whether its single writer or multi writer). So, the same will trigger a rollback in Metadata table if the commit of interest exists in Metadata table. So, its safe to disable the auto rollbacks in MDT.
Ref code: https://github.com/apache/hudi/pull/12236/files#diff-87151a25afbb138b6d05bc0f187b3111af5199c8f0f2628a8beb8a21c3c14980
12. WriteStatus changes:
Since we have a single RDD at the end of writeClient.upsert(), we have introduced a boolean flag in WriteStatus to denote whether is it for data table or Metadata table. So, we use that to bucketize and prepare HoodieCommitMetadata for the Data table.
Ref code: https://github.com/apache/hudi/pull/12236/files#r1839056049
13. DT Compaction
If you have gotten a grasp of ingestion commits dag, lets go to compaction. I have not fixed data table clustering yet in this patch. But changes to compaction should pretty much give you an idea.
Lets take a peek at how compaction control flow looks like w/ the new dag. We are not going to touch the scheduling of compaction w/ this dag rewrite exercise. Only during compaction execution, we will touch MDT.
So, tableServiceWriteClient and writeClient.compact() will return. Dag is not yet triggered.
HoodieWriteMetadata compactionWriteMetadata
which will contain RDDcompactionTime
Changes for ingestion writes were mostly straightforward w/ the revamped dag. But for compaction, we had to make some changes.
Note: As you could see, we are changing the compaction dag here. For eg, even if someone disabled Metadata table completely, the compaction execution uses a diff flow w/ this revamp. So, if you are reviewing, do pay close attention to these code blocks.
Ref code: https://github.com/apache/hudi/pull/12236/files#r1839059354 https://github.com/apache/hudi/pull/12236/files#r1839062145 https://github.com/apache/hudi/pull/12236/files#r1839063857
14. MDT stats generation:
As per the design, our intention is to generate all required stats for MDT record generation in the data table write handle itself and pass it along w/ WriteStatus. FILES, RLI and col stats are taken care in this patch. For col stats, prior to this patch, only Append handle was generating the stats. But with this patch, even create handle and merge handle is generating col stats and attaching it to the WriteStatus. General idea here is to embed all required stats (col stats, bloom filter, functional index, secondary index) in the WriteStatus returned from writehandles in data table.
Ref code: https://github.com/apache/hudi/pull/12236/files#diff-b85f7289adc6d3bd6ecc32feab502e2e95f70539e1186d479fd0815d58485f84 https://github.com/apache/hudi/pull/12236/files#diff-44402d60ebabe11a4826e9c8a548b1737b32b0664f6af3c379d933b899e295a8 https://github.com/apache/hudi/pull/12236/files#diff-63a77e05c924278c190061a1a18a992a7f9480af14f0f34f4328bf72ae673fe9
Things to flesh out/Pending items:
insert_overwrite
for RLI partition, we might have to poll FSV to fetch latest base files and then record keys in them. So, polling FSV is not a generally recommended from executor. and here we are in need of doing it for lot of files and so we can't just delegate the work to one write handle. So, this needs to be figured out.Tests:
Note: We will be introducing a flag to enable the new optimized dag and will be turned off by default. Optimized dag will be fully implemented for spark. Flink and java will be taken up later.
Feel feel to comment on the overall design or any code blocks in particular. Since this has a very large blast radius, we wanted to ensure we don't have any gaps in the design or impl.
In the next one day or two, I will try to add `Notes to reviewer" as well so as to help w/ reviewing.
Impact
Describe any public API or user-facing feature change or any performance impact.
Risk level (write none, low medium or high below)
If medium or high, explain what verification was done to mitigate the risks.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
Contributor's checklist