Open yma11 opened 1 year ago
@yma11 In fact, these two PRs were all proposed by our team. @YannByron and I have communicated offline to a better architecture design for these two PRs. We agree with what you said, and we can be responsible for optimizing this area and provide a better design structure as soon as possible.
We also encountered some problems with BatchScanExecTransformer, and the current implementation may also need optimization.
Hi @yma11, thank you for the invitation.
Here, I will briefly explain that our following design will mainly cover the three parts in the integration of Lake format and Gluten/Velox. IMO all these problems are related to the Lake format, but they can be solved independently:
such as column mapping. I think this part can be solved at the gluten-core layer, such as https://github.com/oap-project/gluten/pull/3376. But perhaps a better solution is to rewrite Transformer
, not SparkPlan
(the pr's implementation), considering that Gluten should return the original SparkPlan
when fallback occurs.
As @yma11 mentioned, velex plans to implement column mapping in the native layer (I am very worried that will make these native readers/datasources more complicated). Even so, I think we can still accept the current solution (rewriting SparkPlan
or Transformer
). Some reasons for this:
1) This solution is brief enough and easy to switch this to that solution (that velox can deal with column mapping) if it has to.
2) Let users use it first in the scenario they need. After all, there is no exact schedule to support this feature.
3) Moreover, this solution currently provides a small framework, that is, to rewrite plan, which can be used when needed.
Currently gluten/velox already supports FileSourceScanExecTransformer
(corresponding to DSV1, that's DeltaLake used) and BatchScanExecTransformer
(corresponding to DSV2, that's Iceberg used). For DSV2, different datasources have different implementations of the org.apache.spark.sql.connector.read.Scan
. We have to build GlutenPartition
which is used in native reader by parsing different implementations of Scan
, that's https://github.com/oap-project/gluten/pull/3043 @liujiayi771 (In this pr, also allow Iceberg cow table to use the underlying storage file format of native reader, like orc/parquet. This will be discussed below).
IMO, the key of this part is how to make the framework design more reasonable, clearer, and easier to expand (@ulysses-you
also said in https://github.com/oap-project/gluten/pull/3043#issuecomment-1709405827).
To support cow table, one of the native implementation of cow tables (Deltalake non-DV table, Hudi cow table, Iceberg V1 table, Paimon append-only table) in different lake formats is that using the underlying file format readers(parquet/orc), which is the way that is adopted by https://github.com/oap-project/gluten/pull/3043 and https://github.com/oap-project/gluten/pull/3376. I personally think that this method is acceptable under the current circumstances. To support mor table, it must be solved by implementing the separate datasource (e.g. IcebergDataSource ) in velox layer (obviously, the velox community needs to follow up here, like Iceberg https://github.com/facebookincubator/velox/issues/5977). I hope that the separate datasource implementations need to consider supporting the cow table at the same time. (when this is done, it's easy to adjust the native implementation of cow table).
In addition, on the code framework (or the transformer hierarchy), our thoughts is consistent with @yma11 mentioned above.
Next, we will propose the design (maybe it's a draft) soon cc @liujiayi771 , so that we can discuss deeply on it.
- This solution is brief enough and easy to switch this to that solution (that velox can deal with column mapping) if it has to.
- Let users use it first in the scenario they need. After all, there is no exact schedule to support this feature.
It's acceptable to continue current solution, like column mapping, and then switch to velox when it's supported for Gluten quick adoption. But even this, a clear detailed design need to be finalized first so that community can work together based on it. Let's discuss more when the draft is ready.
these two PRs were all proposed by our team.
Are the two PRs submitted?
these two PRs were all proposed by our team.
Are the two PRs submitted?
Makes gluten/velox support for lake format(DeltaLake, Iceberg) query.
support both Copy-On-Write table and Merge-On-Read table;
DeltaLake with/without Deletion Vector
Iceberg V1/V2 Table
Hudi COW/MOR Table
support column-mapping mode
Some design thoughts:
In gluten project, gluten-core and the parts related to lake format are divided into modules, and the particular logic of Lake format is analyzed and processed separately;
enable gluten supports lake format in a plug-in manner, gluten-core should not depend on any lake format modules.
gluten-core needs to provide some interfaces which is used to extend lake format-specified logic. some of interfaces are shown as follows:
gluten-core
io/glutenproject/
extension/
trait RewritePlanRules
execution/
trait BaseDatasource
trait BaseScanTransformer
It allows table columns and the underlying file(Parquet/ORC) columns to use different names. This enables schema evolution operations such as RENAME COLUMN
and DROP COLUMNS
without the need to rewrite the underlying files.
Once we scan a table that enable column mapping (for example: for Delta Table), we should to deal with the relationship between table columns and file columns. And this can be achieved by rewriting plan.
To solve this case, RewritePlanRules is abstracted out, which provide an ability to rewrite/transform SparkPlan before transform SparkPlan to GlutenPlan.
This inferface is like this:
trait RewritePlanRules {
def rules: Seq[Rule[SparkPlan]]
}
And then, A RewritePlanrule
will load all the implementations of RewritePlanRules
first, and inject this to ColumnarOverrides
.
case class RewritePlan(session: SparkSession) extends Rule[SparkPlan] {
// 1. load all the implementations of [[ RewritePlanRules ]]
val rules: Seq[Rule[SparkPlan]] = ...
// 2. apply these rules one by one
def apply(plan: SparkPlan): SparkPlan = {
rules.foldLeft(plan) {
case (plan, rule) =>
rule(plan)
}
}
}
And we can define DeltaRewritePlanRulesin
gluten-delta and this has a rule to transform each of DeltaScan node to Project + FileScan.
Currently, Spark supports four Scan types:
HiveTableScanExec: used in hive tables.
BatchScanExec: used in v2 datasource which is supported since Spark 3.0, e.g. iceberg.
FileSourceScanExec: used in HadoopFSRelation, e.g. parquet, delta, hudi (cow without schema on read).
RowDatasourceScanExec: used in datasource that return data directly, not files, e.g. hudi.
And gluten supports the first three types.
For RowDatasourceScanExec
, I think tables which use RowDatasourceScanExec
need to provide the native version, integrate it into Velox, and then gluten can work on these. The following design is applicable to RowDatasourceScanExec
, but we force on BatchScanExec
and FileSourceScanExec
at the implementation level.
For BatchScanExec
, the core abstraction in Spark is Batch
, which define the two key things:
planInputPartitons: these data spliits that can be processed by spark tasks.
createReaderFactory: for each of data split, its function defines that how to scan tuples from this data split.
The two functions are functionally aligned FileIndex
and FileFormat
which used in datasource v1.
Here this desgin refines the Scan Transformer structure.
BaseDataSource
Encapsulate the necessary datasource information.
trait BaseDataSource {
def dataSchema: StructType
def partitionSchema: StructType
def partitions: Array[InputPartition]
def fileFormat: String
def getInputFilePaths: Seq[String]
}
BaseScanTransformer
the core abstraction, the key abilities:
inherit BaseDataSource;
For velox backend, provide a list of LocalFilesNode
;
For clickhouse backed, provide doExecuteColumnar
method;
trait BaseScanTransformer extends TransformSupport with SupportFormat {
def filterExprs(): Seq[Expression]
def outputAttributes(): Seq[Attribute]
override lazy val supportsColumnar: Boolean = {
// .......
}
override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = {
// .......
}
override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = Seq.empty
override protected def doValidateInternal(): ValidationResult = {
// ......
}
override def doTransform(context: SubstraitContext): TransformContext = {
// ......
}
override def doExecuteColumnar(): RDD[ColumnarBatch]
override def getLocalFilesNodes: Array[LocalFilesNode] = {
partitions.map { p =>
constructLocalFilesNode(p)
}
}
def constructLocalFilesNode(input: InputPartition): LocalFilesNode
}
Notice:
For mor tables, LocalFilesNode doesn't keep the enough information. Thus, we allow to create its child class to expand, and the child class is required to override the toProtobuf
method. In the following iceberg part, we will give an example.
DatasourceScanTransformer
the base class for datasource v1.
abstract class DatasourceScanTransformer extends BaseScanTransformer with BaseDataSource {
val relation: BaseRelation
val tableIdentifier: Option[TableIdentifier]
}
FileSourceScanTransformer: the scan implementation of Datasource v1 based on HadoopFSRelation
, and extends DatasourceScanTransformer
;
DeltaLakeScanTransformer: the scan implementation of Delta table, located in gluten-delta, extends FileSourceScanTransformer
;
BatchScanTransformer
the base class for datasource v2.
class BatchScanTransformer(
scan: Scan
// ....
// ....
) extends BaseScanTransformer {
override def constructLocalFilesNode(input: InputPartition): LocalFilesNode = {
//...
}
}
IcebergScanTransformer
the scan implementation of Iceberg table, located in gluten-iceberg, extends BatchScanTransformer
;
Here give a detail explanation for this.
case class DeleteFile (
path: String,
fileContent: Int, //1=>posisition delete, 2=>equality delete
fileFormat: ReadFileFormat,
fileSize: Long,
recordCount: Long,
lowerBounds: Map<Int, String>,
upperBounds: Map<Int, String>
)
class IcebergLocalFilesNode extends LocalFilesNode {
private val deleteFiles: List[List[DeleteFile]]
override def toProtobuf(): ReadRel.LocalFiles = {
//....
}
}
class IcebergScanTransformer(
scan: SparkBatchQueryScan
// ....
// ....
) extends BatchScanTransformer {
// convert InputPartition to IcebergLocalFilesNode
override def constructLocalFilesNode(input: InputPartition): LocalFilesNode = {
// ...
}
}
To support Iceberg V2 table(MOR), the data split needs to contain these deleteFiles. We also upgrade the FileOrFiles
accordingly.
There are many places to create the scan transformer object based on file format or scan. Also a factory class for these needs to be provided. Among them, the scan transformer of lake format is initialized by reflection.
object ScanTransformerFactory {
def apply(fileSourceExec: FileSourceScanExec): FileSourceScanExecTransformer = {
...
}
def apply(batchScanExec: BatchScanExec): BatchScanTransformer = {
...
}
}
Let explain the fileFormat
method of BaseDataSource
more.
To support the cow table of lake format, we allow to use the underlying file format until a native reader implemention is provided. So this method can return ParquetReadFormat
or OrcReadFormat
in the current solution.
To support the mor table, a native reader is necessary and it should support cow and mor table. With this, this mehtod should return the specific ReadFileFormat, e.g. DeltaReadFileFormat, IcebergReadFileFormat.
Corresponding to the LocalFilesNode
expansion, modify the protobuf FileOrFiles
message to extend support for lake formats.
Due to the different parameters required for different lake formats and table types, we need to abstract different ReadOptions for each lake format and pass specific information. For example, in the FileOrFiles message, add the IcebergReadOptions message to convey iceberg-specific information.
Due to the preference of the Velox community, the DeleteFile information is likely to be serialized as a string for transmission. The DeleteFile section in protobuf may be adjusted to a string.
message IcebergReadOptions {
enum FileContent {
POSITION_DELETES = 1;
EQUALITY_DELETES = 2;
}
message DeleteFile {
FileContent fileContent = 1;
string filePath = 2;
oneof file_format {
ParquetReadOptions parquet = 3;
OrcReadOptions orc = 4;
}
uint64 fileSize = 5;
uint64 recordCount = 6;
}
oneof file_format {
ParquetReadOptions parquet = 1;
OrcReadOptions orc = 2;
}
repeated DeleteFile delete_files = 3;
}
On the native, after receiving the protobuf message, we need to construct different SplitInfo based on the file_format information recorded in the protobuf. Similarly, in the native, different lake formats also need to construct xxSplitInfo that inherits from SplitInfo to pass additional information. These information will be used to construct the corresponding HiveConnectorSplit in WholeStageResultIteratorFirstStage.
Similar to handling in Java, we need to abstract the process of converting SplitInfo to HiveConnectorSplit in a method of SplitInfo itself. SplitInfo should be converted to HiveConnectorSplit, while IcebergSplitInfo should be converted to HiveIcebergSplit ,etc.
struct SplitInfo {
/// Whether the split comes from arrow array stream node.
bool isStream = false;
/// The Partition index.
u_int32_t partitionIndex;
/// The partition columns associated with partitioned table.
std::vector<std::unordered_map<std::string, std::string>> partitionColumns;
/// The file paths to be scanned.
std::vector<std::string> paths;
/// The file starts in the scan.
std::vector<u_int64_t> starts;
/// The lengths to be scanned.
std::vector<u_int64_t> lengths;
/// The file format of the files to be scanned.
dwio::common::FileFormat format;
velox::connector::ConnectorSplit getConnectorSplit();
};
struct IcebergSplitInfo : public SplitInfo {
std::unordered_map<std::string, std::vector<DeleteFile>> deleteFiles;
velox::connector::ConnectorSplit getConnectorSplit() override;
}
@yma11 @weiting-chen please help to review this and look forward your feedbacks.
Thanks a lot @YannByron for the detailed design!
And then, A RewritePlanrule will load all the implementations of RewritePlanRules first, and inject this to ColumnarOverrides.
This looks like a very common feature. Did you already have some thoughts on the way to implement this? By a service loader or some kind of reflection tools? Also, should we care about the applying order for the plugged rules?
BatchScanTransformer: the base class for datasource v2.
The hierarchy of datasource transformers in Gluten has an issue that they extend from Vanilla Spark's case classes. For example, BatchScanExecTransformer inherits from case class BatchScanExec
and FileSourceScanExecTransformer inherits from case class FileSourceScanExec
. By the new BaseScanTransformer
-> DatasourceScanTransformer
-> BatchScanTransformer
design would you mean to completely remove the problematical case-class-extending practices from code base?
Corresponding to the LocalFilesNode expansion, modify the protobuf FileOrFiles message to extend support for lake formats.
Two questions here:
cpp/core
? (I'll be OK to both since our cpp code is not that well-organized yet. For example the remote shuffle code is still in cpp/core
)And some other questions for the whole design:
backends-velox
, backends-clickhouse
, cpp/velox
, etc? (except for test code if any)GlutenConfig.scala
or somewhere in lake format's own Maven module?Again, thank you everyone for bringing this up. Can't wait to see it landing.
@zhztheplayer
would you mean to completely remove the problematical case-class-extending practices from code base?
I have encountered this issue before, and I also suggest that the community remove this inheritance relationship. We can try to do it.
Was all the needed Velox code already contributed to upstream Velox?
Not yet. But they have already merged many prerequisite PRs, and the PR supporting iceberg has also been proposed. It should be relatively quick to merge.
Do you already have some thoughts to organize new lake-specific cpp code?
I have successfully used the relevant PR from the Velox community to establish the process of reading iceberg mor table. In gluten/cpp
, there should only be code related to converting from substrait to Velox icebergSplit. The specific mor process is implemented in Velox. As for other lake formats, there is currently no prerequisite work for mor table in native, but for this framework, we can start by supporting cow table first.
@zhztheplayer
This looks like a very common feature. Did you already have some thoughts on the way to implement this? By a service loader or some kind of reflection tools? Also, should we care about the applying order for the plugged rules?
yep, maybe use a service loader like org.apache.spark.sql.execution.datasourcesDataSource.lookupDataSource
. I think we don't need to care the order between different lake format modules. But each of Rule should guarantee that only plans that belong to this table can be rewrote in its own module..
would you mean to completely remove the problematical case-class-extending practices from code base?
In fact, I'm aware of the problem. But this does not affect the design, so it can be discussed separately. cc @liujiayi771
To what extent should we modify backends' own code? For example, do we have to apply changes to backends-velox, backends-clickhouse, cpp/velox, etc? (except for test code if any)
Based on my design, I plan to move the logic of IteratorApi.genFilePartition
to the scan transformers. Nothing else needs to modify..
IIUC each lake format will have its own Maven module (correct me if I was wrong). So are we going to compile the modules into independent jars to let users drop in, or just to include the compiled code into the Gluten fat jar?
Yep. the former way. If users want to use lake format, they need to put the separate jar to the classpath, not a fat jar.
Are we going to have individual Spark config options for each lake format? Do we have to put them into GlutenConfig.scala or somewhere in lake format's own Maven module?
No any configs in this design. I hope users enable native query again lake format only by putting the gluten-lakeformat jar to the class path.
Hi @YannByron and @liujiayi771, thank you guys for such a detailed design!
RewritePlanRules and Column Mapping
: are there any other scenarios that will need such RewritePlanRules
? If the original plan changes, what will happen if fallback happens on the changed node? What about if this scan has filter pushdown? I think it's not necessary to do this rewrite on plan if the Column Mapping
can be done in transformer layer.
To support the mor table, a native reader is necessary and it should support cow and mor table. With this, this mehtod should return the specific ReadFileFormat, e.g. DeltaReadFileFormat, IcebergReadFileFormat.
Does it mean that in MOR table case, we don't need to care it's a parquet or orc file?
IIUC each lake format will have its own Maven module (correct me if I was wrong). So are we going to compile the modules into independent jars to let users drop in, or just to include the compiled code into the Gluten fat jar?
Yep. the former way. If users want to use lake format, they need to put the separate jar to the classpath, not a fat jar.
My understanding is that we don't have direct dependency on these lake related jars but do our job based on Spark interfaces. The data lake read/write offloading to Gluten/velox is transparent for users who have Spark+data lake properly worked. right?
@yma11
RewritePlanRules and Column Mapping: are there any other scenarios that will need such RewritePlanRules? If the original plan changes, what will happen if fallback happens on the changed node? What about if this scan has filter pushdown? I think it's not necessary to do this rewrite on plan if the Column Mapping can be done in transformer layer.
I don't know whether there are other cases that need RewritePlan
or RewriteTransformer
. At least, delta column mapping dose, and there is a way to use if needed. Even a plan is rewrote, it can return the original, right plan when fallback happens because the origin plan is persisted before any rule is applied in gluten. I think either RewritePlan
or RewriteTransformer
works. I choose the first one, cause it may be easier to understand.
Does it mean that in MOR table case, we don't need to care it's a parquet or orc file?
In the java layer, yes. But the enough information (base file's file format, delete file's file format) needs to be passed as options to native layer.
My understanding is that we don't have direct dependency on these lake related jars but do our job based on Spark interfaces. The data lake read/write offloading to Gluten/velox is transparent for users who have Spark+data lake properly worked. right?
These modules (like gluten-delta, gluten-iceberg) depends on lake related jars, but gluten-core doesn't. Users just need to put the gluten-datalake jar to the class path, then it's transparent for them.
@zhztheplayer Update the answer to this question.
would you mean to completely remove the problematical case-class-extending practices from code base?
I tried to remove this case-class-extending today. I found that this problematical case-class-extending is used to retrieve the filteredPartitions
of BatchScanExec
. However, the method is private and cannot be directly invoked, which is why it is accessed through inheritance to obtain the required InputPartition. It seems that apart from the current implementation, reflection seems to be the only method available. Our refactoring does not solve the issue of obtaining the InputPartition.
Update: As discussing with @yma11 offline, we choose the solution that rewrite Transformer
, not SparkPlan
in schema-related scenarios.
@YannByron @liujiayi771 Thanks for your guys' active involvement. Look forward to your PRs!
Greetings,
We would like to join the discussion and ask for your guidance - we are developing a new Velox connector for integrating our storage engine with the Gluten project. IIUC, the current Gluten design allows pushing down file-based scans of supported formats (e.g. Parquet, ORC, ...) into Velox. However, our storage engine uses RPC-based communication for metadata and data retrieval, similar to the Arrow Flight protocol. Currently we have a Spark Datasource V2 connector and we would like to integrate it with the Gluten project.
We have a few thoughts and would be happy to hear your opinions:
Please let us know what do you think should be the best way forward.
Hi @rz-vastdata. We are currently working on implementing the design mentioned above. Iceberg may need a new file format as well. You can participate in the review of our PRs and provide some suggestions. @yma11 We need to expedite the progress of PR review as there are many additional tasks to do.
We are currently working on implementing the design mentioned above. Iceberg may need a new file format as well.
Sounds great, thanks for the update!
You can participate in the review of our PRs and provide some suggestions.
We'd be happy to participate in the review. Could you please let us know which issues/PRs are related to the above design? Is there a label (https://github.com/oap-project/gluten/labels) or an umbrella issue we can follow?
@rz-vastdata I have modified the title of the PR, and it will reference this issue.
Following from https://github.com/oap-project/gluten/pull/3650, IIUC the current approach is to accelerate file-based data sources in Gluten - right?
Asking since in our case, the Vast data source is based on a generic Scan
(and not a more specific FileScan
), which currently causes it to fail conversion to Gluten plan - e.g.:
https://github.com/oap-project/gluten/blob/30865e5874e33c5996194352f43c911960673cd2/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala#L60-L65
https://github.com/oap-project/gluten/blob/30865e5874e33c5996194352f43c911960673cd2/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala#L483-L508
https://github.com/oap-project/gluten/blob/b40a5f094d911cccaa8eb90a11fe44f78038ea94/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala#L65-L81
Would it be possible to support converting custom Scan
-based data sources into Gluten-supported inputs?
I understand that it also requires adding support to the Substrait protobuf definition and the Velox source code, but it seems to match better the use-case where the connector is not using files to store the data. For example, in our case, there is a custom protocol to retrieve the data from an external RPC server, so there is not need to use file-based I/O and abstractions.
BTW, how is this handled when Gluten is used to interface with external hardware devices (e.g. FPGA/GPU/ASIC accelerators, as shown in https://user-images.githubusercontent.com/47296334/199617207-1140698a-4d53-462d-9bc7-303d14be060b.png)?
Since #3650 merged, I suggest define SplitInfo proto in gluten side which mean do not use substrait LocalFile, then we can decouple SplitInfo from substrait plan, cause plan is immutable in specific stage but SplitInfo is mutable, it bring more clear design and interface, and help to decrease serialization consume.
@Yohahaha What's the purpose to make SplitInfo
mutable? I think any modification on SplitInfo should be done before it's created. It's generated based on Spark plan which means originally it's part of plan info. Currently all plan related info is passed to Velox side through substrait and it's better not to scatter such info in Gluten.
@yma11
plan is immutable in specific stage but SplitInfo is mutable
plan or plan fragment is immutable means not contain task info, SplitInfo is mutable means all task of stage has its own SplitInfo to process.
Combine these bring much complexity, we need pass each task's SplitInfo into plan and then serialize whole plan. We encounter memory issue before, and serialization cost issue now.
it's better not to scatter such info in Gluten.
so you'd like use LocalFileNode for all format? does it satisfy all case?
Combine these bring much complexity, we need pass each task's SplitInfo into plan and then serialize whole plan. We encounter memory issue before, and serialization cost issue now.
The serialized substrait plan has a complete definition for corresponding velox task so it needs contain SplitInfo. The serialization happens for each task even we don't pass SplitInfo as it may still has other task specific infos. Even you do the serialization only once at driver side, you need to combine this shared
plan part along with task specific infos and that will not be straight and clean. You may only get some memory saving but execution time should not be benefited.
it's better not to scatter such info in Gluten.
so you'd like use LocalFileNode for all format? does it satisfy all case?
I am not sure. But it's quite open to add more necessary fields in this proto if we need.
@rz-vastdata #3650 is just a preparatory work. The next PR will be submitted next week and will include IcebergScanTransformer
, which contains the specific way of generating SplitInfo
for Iceberg. Iceberg's Scan also inherits from Spark Scan, which I understand is consistent with your scenario.
@yma11 It seems that some of the information required for the Iceberg format can be added into LocalFileNode
, but it may appear somewhat messy. We can take a look at it together later. The next PR will not include this part yet, as it will only support the COW table. We will need to wait for the Iceberg-related PRs in the Velox community to be merged before we submit the modifications. However, we can put forward an initial Draft PR before that.
Iceberg's Scan also inherits from Spark Scan, which I understand is consistent with your scenario.
Sounds good, many thanks @liujiayi771!
By the way, is there an open-source example for Gluten being used to interface with external devices (e.g. FPGA/GPU/ASIC accelerators, as shown at https://github.com/oap-project/gluten#2-architecture), e.g. when the data sources are not file-based?
By the way, is there an open-source example for Gluten being used to interface with external devices (e.g. FPGA/GPU/ASIC accelerators, as shown at https://github.com/oap-project/gluten#2-architecture), e.g. when the data sources are not file-based?
I'm working on the FPGA version
Why is there a DeleteFile option to the Iceberg ReadOptions? Presumably you aren't actually deleting files as part of a read operation. Are you ignoring the files instead? For that matter, why is the option that says how to delete named FileContent?
Why is there a DeleteFile option to the Iceberg ReadOptions?
Iceberg uses DeleteFile to exclude some records in data file.
Description
Currently there are 2 PRs opened in Gluten to support Iceberg COW table read and Delta Lake read. There is also one hot discussion in Velox about Iceberg read support. By consolidating the ideas and based on Gluten's position, we would like to share a draft unified design for data lake read support in Gluten.
As addressed in this project's home page, one of Gluten key function is to transform Spark’s whole stage physical plan to Substrait plan and send to native. It applies to data lake read support, thus:
We'd better avoid hacking of original Spark physical plan node. Gluten core has plan transformer to generate correct plan info into substrait format and then pass it to Velox for read and computation. So no matter what kind of the hack is, it should can be done in the transformer layer, such as column mapping. IMO, we should try best pass original info in spark plan to Velox as a bridge and do correct consumption at Velox side, unless it's not doable or velox can't support. By the way, one issue for feature like column mapping is, it's a common feature for kinds of file format reading, velox can handle this at its datasource level and the community has plan to do so.
Clear transformer hierarchy is need for different data lake backends. In the Iceberg COW table read PR, a new branch is added to do specific process for Iceberg and leverage an utility class put in a dedicated folder, and in future, I believe more branches will be needed to support other cases, like MOR. So introducing a new transformer inherited from
BatchScanExecTransformer
would be a better way. The possible hierarchy should be like following:@YannByron @felipepessoto @liujiayi771 @ulysses-you, please give comments on above suggestions. Thanks.