Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain)
This PR is a modified version of an unmerged PR upstream (that will be reopened by the author soon): https://github.com/apache/spark/pull/29625. However, since we are not fully caught up with the 3.0 branch and we need this feature internally, I have modified it to work on our branch with the least amount of changes required.
What changes were proposed in this pull request?
Quick background: When there are multiple files in a single bucket, spark does not propagate the sort ordering to the FileSourceScanExec node. This means that if a parent operator requires a child ordering that is equal to the file ordering in the buckets, we still end up sorting every partition. This PR propagates the sort ordering and creates an RDD that produces rows by merging these sorted iterators.
The diff looks a bit large but the actual changes are minimal:
FileScanRDD used to contain all the logic to produce the next rows. In this PR, we pass it a ScanMode instead that then delegates to a different iterator if we need a sorted bucketed scan.
The FileScanIterators contains a BaseFileScanIterator and 3 subclasses for row based scans, column batch scans and the sorted bucketed scan.
The next methods implemented by the FileRowScanIterator and FileBatchScanIterator are also exact copy pastes from FileScanRDD, except that we have removed this if-else check here and split it into 2 different iterators similar to the upstream PR - this is purely for cleanup and I can merge them back if you prefer.
The next method in FileSortedBucketScanIterator is the core logic of this change - this is a literal copy paste from the upstream PR. It holds a min heap of the next element in the backing iterators and returns the head. This will require a higher memory footprint for the vectorized readers since it holds the next batch from all of the backing iterators in memory.
Whatever conflicts this causes with out 3.0 branch, I can take responsibility for resolving those. Once the upstream PR has merged and we are up to date with 3.0, I will revert this PR and cherry pick the upstream one.
How was this patch tested?
Unit tests. It is also hidden behind a flag like the upstream PR, so we can selectively enable it initially before rolling out more widely.
Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain)
This PR is a modified version of an unmerged PR upstream (that will be reopened by the author soon): https://github.com/apache/spark/pull/29625. However, since we are not fully caught up with the 3.0 branch and we need this feature internally, I have modified it to work on our branch with the least amount of changes required.
What changes were proposed in this pull request?
Quick background: When there are multiple files in a single bucket, spark does not propagate the sort ordering to the
FileSourceScanExec
node. This means that if a parent operator requires a child ordering that is equal to the file ordering in the buckets, we still end up sorting every partition. This PR propagates the sort ordering and creates an RDD that produces rows by merging these sorted iterators.The diff looks a bit large but the actual changes are minimal:
FileScanRDD
used to contain all the logic to produce the next rows. In this PR, we pass it aScanMode
instead that then delegates to a different iterator if we need a sorted bucketed scan.FileScanIterators
contains aBaseFileScanIterator
and 3 subclasses for row based scans, column batch scans and the sorted bucketed scan.BaseFileScanIterator
are a literal copy paste of what used to be inFileScanRDD
except https://github.com/palantir/spark/pull/730/files#diff-c64b05200405088131067d856ed7d9d29290d47881018c7a7b0db4668ddda9d3R140-R143.next
methods implemented by theFileRowScanIterator
andFileBatchScanIterator
are also exact copy pastes fromFileScanRDD
, except that we have removed thisif-else
check here and split it into 2 different iterators similar to the upstream PR - this is purely for cleanup and I can merge them back if you prefer.next
method inFileSortedBucketScanIterator
is the core logic of this change - this is a literal copy paste from the upstream PR. It holds a min heap of the next element in the backing iterators and returns the head. This will require a higher memory footprint for the vectorized readers since it holds the next batch from all of the backing iterators in memory.Whatever conflicts this causes with out 3.0 branch, I can take responsibility for resolving those. Once the upstream PR has merged and we are up to date with 3.0, I will revert this PR and cherry pick the upstream one.
How was this patch tested?
Unit tests. It is also hidden behind a flag like the upstream PR, so we can selectively enable it initially before rolling out more widely.
cc @mattsills