apache / iceberg-rust

Apache Iceberg
https://rust.iceberg.apache.org/
Apache License 2.0
469 stars 95 forks source link

feat: make file scan task serializable #377

Closed ZENOTME closed 1 month ago

ZENOTME commented 1 month ago

There is a user case of file scan task for the compute engine:

  1. compute the file scan task and shuffle them to the compute node
  2. The compute node does the scan work in parallel

In this case, it required the compute engine could:

  1. Access the FileScanTask directly
  2. Serialize and Deserialize the FileScanTask

I draft this PR to try to make them accessible. Serialize and Deserialize ManifestEntry needs to take more work and I find that the reader only needs the file path in ManifestEntry. Seems the metadata in ManifestEntry is used in the planning phase to prune the file. After the plan is complete, lots of metadata is not needed. We can add the metadata we need for scanning in the future. So I make the FileScanTask to contain the data file path only. Please let me know if this assumption is wrong.

ZENOTME commented 1 month ago

cc @liurenjie1024 @Fokko @Xuanwo @sdd @viirya

sdd commented 1 month ago

This seems reasonable, but perhaps we might want to consider having this as a separate method to the existing plan_files though so that anyone who is using the existing stream of file plan tasks does not get broken by this.

ZENOTME commented 1 month ago

This seems reasonable, but perhaps we might want to consider having this as a separate method to the existing plan_files though so that anyone who is using the existing stream of file plan tasks does not get broken by this.

I think we don't need a separate method for this.🤔 We just need to let FileScanTask be Serialize, Deserialize and then user can use the plan_files() to get the FileScanTask and transfer them to compute node. A simple case may like following to read all files at once. Also user can use the stream interface to have some optimization, e.g. read in stream way.

let plan_file_stream = scan.plan_files();

// read all file scan.
let file_scans = vec![];
#[for_await]
for file_scan in plan_file_stream {
  file_scans.push(file_scan);
}

// send the file scan to the compute node. The compute node can 
// read them all at once and use the Reader to read the data.
arrow_reader(stream::iter(file_scans.into_iter())) 
liurenjie1024 commented 1 month ago

I think the idea looks good to me, but I'm not sure if we should modify FileScanTask now. Should we postpone this until we start to integrate with a true engine such as datafusion or ballista?

Fokko commented 1 month ago

I'm in favor of this change since this will keep the API much cleaner. The ManifestEntry will change over time, maybe good to keep Iceberg-specific things out of there. The pattern of the FileScanTask as an interface is proven by Iceberg both in Java as in Python.