In this issue we set up the project to support the Iceberg table format in the Presto native engine - Prestissimo. This project will help the user to be able to read Iceberg tables written by Presto or other engines, and support basic row level updates. Our goal is to support all the operations required in the TPCDS benchmark.
Functional Requirements
The existing Iceberg connector already supports basic DDLs, DMLs and some metadata information queries. These are mostly metadata operations and are common to both Presto and Prestissimo. In this issue we will be focusing on the following features:
Select (read path) - (Pri 0)
Latest snapshot
Time travel
Bulk Inserts - (Pri 0)
Row level Updates (write path)
Insert - (Pri 1)
Delete - (Pri 1)
Update - (Pri 2)
Merge into - (Pri 2)
Data Maintenance - (Pri 2)
Snapshot cleaning
Compaction
Schema Evolution - (Pri 3)
The file types that will be supported
Parquet
ORC
DWRF
Architecture
This document will only consider the reading path.
This project has 3 parts:
Scan planning (On coordinator)
Split serialization and deserialization(presto_cpp)
Scan and delete filter pushdown (Velox)
Scan Planning
In the Presto architecture, the scan planning will be done on the coordinator:
Provide IcebergMetadata during planning
Partition and File level pruning
Enumerate IcebergSplit
There is already basic Iceberg support in Presto via the Iceberg connector. We will continue to use the structure and add additional functionalities like reading the delete files.
Note: There is an ongoing discussion to consolidate all the Hive connectors into a single entity. This is still under discussion and is a separate effort that can be done later.
The scan planning is supported by the Iceberg library that is already imported in Presto.
Acquiring Metadata
During planning time, the MetadataManager will try to get the table handle for the Iceberg table to be read. It will try to load an Iceberg table from the catalog, which is a org.apache.iceberg.Table object. This Table contains lots of metadata about the Iceberg table, including but not limited to:
Schema
Table location
Snapshots
Update history
Sort orders
Supported operations
etc
Finding snapshots
By default, if the user doesn’t specify the snapshot or timestamp, the latest snapshot will be picked from the org.apache.iceberg.Table object. If the user specifies a timestamp or snapshot Id in the past, the update history, which is a List, should be searched to find the right snapshot. The input version (snapshot Id) or timestamp come from the SQL queries through “FOR VERSION AS OF” and ”FOR TIMESTAMP AS OF TIMESTAMP”
Once the target snapshot is found, the metadata would be used to compose the IcebergTableHandle, which will be included in the TableScanNode in the plan fragment, and sent to the workers via TaskUpdate requests.
Filter Pushdown
Since the current Presto Iceberg implementation implemented it as a connector, it would apply the IcebergPlanOptimizer rule when applying connector specific rules towards the end of the plan optimization. This rule would decompose the filter from the FilterNode in the plan into
Range TupleDomain filters that applies to an entire column
The remaining
The range filter would be pushed to the underlying TableScanNode, and the remaining filter would be kept in the FilterNode.
This is different from the HiveFilterPushdown rule, because the Hive connector supports pushing down subfield filters. In that rule the filter is decomposed into 4 parts
Range TupleDomain filters that applies to an entire column
Range TupleDomain filters that applies to subfields
Dynamic filter
Remaining
1 and 2 would be pushed as the TupleDomainFilter, 4 would also be pushed to TableScan as filter functions, and 3, if any, would remain in the FilterNode.
In the native Iceberg support, we intend to pushdown the filters the same as the HiveFilterPushdown. This requires distinguishing between the Java implementation and the native C++ implementation, since the Java implementation does NOT support full filter pushdown for Parquet files. We may be adding a new session property to distinguish the two implementations.
Split Ser/De In presto_cpp
Then the IcebergSplit will be serialized and sent from the coordinator to the native worker(Velox) via the communication protocol built in presto_cpp. It is a wrapper on the native worker and implements the communication protocol the workers and coordinator use. In this project, we need to add the following functionalities to the presto_cpp module:
Register the Iceberg catalog
Serialize/Deserialize the splits
Since we don’t intend to add a separate Iceberg connector in Velox, we will consider the Iceberg implementation a subclass of the Hive connector, and register it as one of the Hive connectors.
Serializing the IcebergSplit requires us to convert an existing Presto IcebergSplit to a HiveIcebergSplit in Velox. HiveIcebergSplit will be a subclass to the HiveConnectorSplit, which contains a list of Iceberg DeleteFiles. The following structures need to be serialized:
IcebergSplit
DeleteFile
IcebergTableHandle
IcebergColumnHandle
Delete Reader In Velox
This will be described in a separate Github issue in the Velox repository.
Goal
In this issue we set up the project to support the Iceberg table format in the Presto native engine - Prestissimo. This project will help the user to be able to read Iceberg tables written by Presto or other engines, and support basic row level updates. Our goal is to support all the operations required in the TPCDS benchmark.
Functional Requirements
The existing Iceberg connector already supports basic DDLs, DMLs and some metadata information queries. These are mostly metadata operations and are common to both Presto and Prestissimo. In this issue we will be focusing on the following features:
The file types that will be supported
Architecture
This document will only consider the reading path.
This project has 3 parts:
Scan Planning
In the Presto architecture, the scan planning will be done on the coordinator:
There is already basic Iceberg support in Presto via the Iceberg connector. We will continue to use the structure and add additional functionalities like reading the delete files.
Note: There is an ongoing discussion to consolidate all the Hive connectors into a single entity. This is still under discussion and is a separate effort that can be done later.
The scan planning is supported by the Iceberg library that is already imported in Presto.
Acquiring Metadata
During planning time, the MetadataManager will try to get the table handle for the Iceberg table to be read. It will try to load an Iceberg table from the catalog, which is a org.apache.iceberg.Table object. This Table contains lots of metadata about the Iceberg table, including but not limited to:
Finding snapshots
By default, if the user doesn’t specify the snapshot or timestamp, the latest snapshot will be picked from the org.apache.iceberg.Table object. If the user specifies a timestamp or snapshot Id in the past, the update history, which is a List, should be searched to find the right snapshot. The input version (snapshot Id) or timestamp come from the SQL queries through “FOR VERSION AS OF” and ”FOR TIMESTAMP AS OF TIMESTAMP”
Once the target snapshot is found, the metadata would be used to compose the IcebergTableHandle, which will be included in the TableScanNode in the plan fragment, and sent to the workers via TaskUpdate requests.
Filter Pushdown
Since the current Presto Iceberg implementation implemented it as a connector, it would apply the IcebergPlanOptimizer rule when applying connector specific rules towards the end of the plan optimization. This rule would decompose the filter from the FilterNode in the plan into
The range filter would be pushed to the underlying TableScanNode, and the remaining filter would be kept in the FilterNode.
This is different from the HiveFilterPushdown rule, because the Hive connector supports pushing down subfield filters. In that rule the filter is decomposed into 4 parts
1 and 2 would be pushed as the TupleDomainFilter, 4 would also be pushed to TableScan as filter functions, and 3, if any, would remain in the FilterNode.
In the native Iceberg support, we intend to pushdown the filters the same as the HiveFilterPushdown. This requires distinguishing between the Java implementation and the native C++ implementation, since the Java implementation does NOT support full filter pushdown for Parquet files. We may be adding a new session property to distinguish the two implementations.
Split Ser/De In presto_cpp
Then the IcebergSplit will be serialized and sent from the coordinator to the native worker(Velox) via the communication protocol built in presto_cpp. It is a wrapper on the native worker and implements the communication protocol the workers and coordinator use. In this project, we need to add the following functionalities to the presto_cpp module:
Serializing the IcebergSplit requires us to convert an existing Presto IcebergSplit to a HiveIcebergSplit in Velox. HiveIcebergSplit will be a subclass to the HiveConnectorSplit, which contains a list of Iceberg DeleteFiles. The following structures need to be serialized:
Delete Reader In Velox
This will be described in a separate Github issue in the Velox repository.