An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
Context: Currently CDCReader.changesToDF relies on DeltaHistoryManager.getCommits for getting a list of commits and their timestamps. Since DeltaHistoryManager.getCommits is not aware of InCommitTimestamps and Coordinated Commits, it will either return the wrong timestamp or no timestamp at all for certain commits.
This PR updates CDCReader.changesToDF so that it only relies on DeltaHistoryManager.getCommits for non-ICT commits. The rest of CDCReader.changesToDF relies on the output of deltaLog.getChanges which is already Coordinated Commit-aware. The function also already extracts the CommitInfo for all of these commits, which we reuse to get the In-Commit Timestamp for relevant commits. Since the actions were already being read in the function, this PR does not add any additional IO.
This PR also updates DeltaSource so that it propagates CommitInfo actions to CDCReader.changesToDF. These CommitInfo actions are only used for InCommitTimestamps are later filtered out.
How was this patch tested?
Added a Coordinated Commit variant of DeltaCDCScalaSuite with a batch size of 10.
New test cases in InCommitTimestampSuite. More tests coming up.
Which Delta project/connector is this regarding?
Description
Context: Currently CDCReader.changesToDF relies on DeltaHistoryManager.getCommits for getting a list of commits and their timestamps. Since DeltaHistoryManager.getCommits is not aware of InCommitTimestamps and Coordinated Commits, it will either return the wrong timestamp or no timestamp at all for certain commits.
This PR updates CDCReader.changesToDF so that it only relies on DeltaHistoryManager.getCommits for non-ICT commits. The rest of CDCReader.changesToDF relies on the output of deltaLog.getChanges which is already Coordinated Commit-aware. The function also already extracts the
CommitInfo
for all of these commits, which we reuse to get the In-Commit Timestamp for relevant commits. Since the actions were already being read in the function, this PR does not add any additional IO. This PR also updatesDeltaSource
so that it propagatesCommitInfo
actions toCDCReader.changesToDF
. TheseCommitInfo
actions are only used for InCommitTimestamps are later filtered out.How was this patch tested?
Added a Coordinated Commit variant of DeltaCDCScalaSuite with a batch size of 10. New test cases in InCommitTimestampSuite. More tests coming up.
Does this PR introduce any user-facing changes?
No