apache / iceberg-rust

Apache Iceberg
https://rust.iceberg.apache.org/
Apache License 2.0
469 stars 95 forks source link

Enhancement: refine the reader interface #398

Open ZENOTME opened 3 weeks ago

ZENOTME commented 3 weeks ago

Hi, I find that in some cases our reader interface seems redundant for me.

E.g.

let table_scan = table
            .scan()
            .with_batch_size(Some(self.batch_size))
            .select(self.schema.names())
            .predict(predict)
            .build()
            .map_err(BatchError::Iceberg)?;
let file_scan_stream = table_scan.plan_files();

// Create a reader here. We need the info already pass to table_scan again
let reader =  ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone())
                         .with_field_id(....)
                         .wtih_predict(..)

for file_scan in file_scan_stream {
    let arrow_batch_stream = reader.read(file_scan)
}

I recommend we should move the field_id, predicates info into file_scan. Config this info again in reader is not friendly for user and is prone to inconsistent.

ZENOTME commented 3 weeks ago

cc @liurenjie1024 @sdd @Fokko @Xuanwo @viirya

ZENOTME commented 2 weeks ago

I sent a PR(#401) to draft the idea, feel free to tell me if there is something that can be improved.

liurenjie1024 commented 2 weeks ago

Hi, @ZENOTME I think already there exists a to_arrow method here: https://github.com/apache/iceberg-rust/blob/15e61f23198c4cc5d320d631e22e2fbc02d167c8/crates/iceberg/src/scan.rs#L294

liurenjie1024 commented 2 weeks ago

But I agree that we should not make the ArrowReaderBuilder 's methods crate private rather a public api since it's error prone.

ZENOTME commented 2 weeks ago

Hi, @ZENOTME I think already there exists a to_arrow method here:

https://github.com/apache/iceberg-rust/blob/15e61f23198c4cc5d320d631e22e2fbc02d167c8/crates/iceberg/src/scan.rs#L294

Yes, but I think the benefit of this PR is more about the use case for the computing engine. to_arrow is used to convert the scan to an arrow batch stream. But for the computing engine, what it expects is to get the file scan task from the scan, split these task, and distribute them to different compute nodes to get the parallel read ability. I think that's one reason we provide the https://github.com/apache/iceberg-rust/blob/15e61f23198c4cc5d320d631e22e2fbc02d167c8/crates/iceberg/src/scan.rs#L201.

For this use case, the user creates the reader, and uses it to convert the file scan task to an arrow batch stream rather than use to_arrow directly, like the following:

let reader =  ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone())
                         .with_field_id(....)
                         .wtih_predict(..)

for file_scan in file_scan_stream {
    let arrow_batch_stream = reader.read(file_scan)
}

But for now, the reader is not friendly for this use case. It's redundant and prone to inconsistent to provide the field_id and predict info for the reader because these have already been used to create a scan before. A more friendly way is to contain this necessary info in the scan task so that the reader is just "stateless" and without the inconsistent problem, like the following:

let reader =  ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

for file_scan in file_scan_stream {
    let arrow_batch_stream = reader.read(file_scan)
}
liurenjie1024 commented 2 weeks ago

Thanks for the explaination, I'll take a look at the pr.

ZENOTME commented 2 weeks ago

The drawback of the current implementation is containing the metadata in every scan task may be a cost. One solution is to provide an abstract to group the task to share the metadata.

image

Another design is to separate the metadata and scan tasks. We provide the interface to let users extract the metadata from a scan so that they can create the reader using this metadata rather than constructing it by themselves. (Seem this way gives more flexible🤔

image
sdd commented 2 weeks ago

Thanks for this @ZENOTME. The first diagram in your comment above is exactly what I'm currently working to implement as part of a service that exposes an Apache Arrow Flight interface. Each Ticket will contain a serialized struct containing some request metadata (containing the filter predicate) and a list of scan tasks.

liurenjie1024 commented 2 weeks ago

Sorry, I don't quite get what's the meta here?

ZENOTME commented 2 weeks ago

Sorry, I don't quite get what's the meta here?

e.g. field_id, predicate. The info that can be shared by the set of FileScanTasks. Maybe "metadata" is not precise here.

liurenjie1024 commented 2 weeks ago

Sorry, I don't quite get what's the meta here?

e.g. field_id, predicate. The info that can be shared by the set of FileScanTasks. Maybe "metadata" is not precise here.

I agree that they are same in different tasks, but I don't quite get how to share them. In distributed query engine like spark, the distribute tasks to different hosts, one way to do that is utilizing broadcat in spark to do that, but it increases complexity in implementation.

ZENOTME commented 2 weeks ago

Sorry, I don't quite get what's the meta here?

e.g. field_id, predicate. The info that can be shared by the set of FileScanTasks. Maybe "metadata" is not precise here.

I agree that they are same in different tasks, but I don't quite get how to share them. In distributed query engine like spark, the distribute tasks to different hosts, one way to do that is utilizing broadcat in spark to do that, but it increases complexity in implementation.

In this model, it's the user responsibility to share(distribute) the "metadata" to different hosts. What we provide is a method which to get the "metadata" from scan and the "metadata" is serializable/desirable.

#[derive(Serialize,Deserialize)]
struct Metadata {
  field_ids: Vec<i32>,
  predicate: BoundPredicate
}

// master node 
let metadata = scan.meta();
send(metadata);

// worker node
let metadata = receive();
let reader = ArrowReaderBuilder::new().with_metadata(metadata);
let scan_tasks = receive();
for task in scan_tasks {
  reader.read(task);
}

one way to do that is utilizing broadcat in spark to do that

I'm not familiar with this. But I think this can be a way spark to distribute the "metadata". Different compute engines can use in different ways. It's flexible but as you say it increases complexity in implementation.

liurenjie1024 commented 2 weeks ago

Different compute engines provide different capabilities, and I'm not sure if it's worth this complexity since including filters, projections in plan doesn't have too much overhead usually.