apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.44k stars 3.52k forks source link

[Python] Filter on `__row_index` #35301

Open Fokko opened 1 year ago

Fokko commented 1 year ago

Describe the enhancement requested

PyIceberg uses positional deletes to indicate which rows should be omitted. In the normal table.filter(..) a boolean mask is allowed, but not when constructing a scanner. Therefore we first have to read the data into a table, and then filter it. It would simplify the code (and also speed up) if this can be done directly when reading the data.

Having a __row_index where we can filter would greatly simplify this logic.

Component(s)

Python

jorisvandenbossche commented 1 year ago

Given you want a positional delete, would this rather be a "take" operation than a "filter"? I know this is essentially the same (under the hood, filtering an array also does a "take" of the required values), but conceptually for a Dataset this might be different. A filter can be defined with an expression, but a "take" is always with actual materialized values. And so we already have a Dataset.take() method that does that.

So even if you start with a boolean filter, you should be able to already use take() by converting the boolean mask to indices with pyarrow.compute.indices_nonzero right now (and we could still consider adding that as a convenience to Dataset.filter to do that for you)

I am not fully sure how Scanner::TakeRows works given that positional indices depend on the order that data is scanned. I assume it follows the order of the actual vector of fragments.

westonpace commented 1 year ago

The scanner assumes that inputs are extremely large (or potentially infinite) streams. How would a mask be provided in that case?

I think it would be easier to create an order-dependent node that takes in a list of indices and skips those indices. Would that work? I don't think it would be much effort to create.

jorisvandenbossche commented 1 year ago

I think it would be easier to create an order-dependent node that takes in a list of indices and skips those indices.

So like the current FetchNode, but then using an array of indices instead of a slice as input argument? (and omitting vs selecting)

Fokko commented 1 year ago

Based on @jorisvandenbossche his suggestion, I've updated the PR: https://github.com/apache/iceberg/pull/6775/files#diff-49144c27eab7e03926b0310aa8b513dbeb9c8d2a0d33bacf8dedbd88b4680aacR782-R787.

The .take() operation works perfectly to remove the deleted rows. The next step would be to first apply the deletes filter, and then a user filter (an additional filter, after the mask has been applied), and push all of this down to Arrow in a single filter operation. This would avoid materializing the table, and then filtering it even further down.

westonpace commented 1 year ago

So like the current FetchNode, but then using an array of indices instead of a slice as input argument? (and omitting vs selecting)

Yes, it would need a sequencing queue like the fetch node and the process function would be something like...

row_start = current_offset;
row_end = row_start + batch.length;
deleted_indices = get_deleted_indices_in_range(row_start, row_end)
if deleted_indices:
  normalized = deleted_indices - row_start
  kept_indices = range(0, batch.length) - normalized
  batch = batch.take(kept_indices)
output.input_received(batch)
wjones127 commented 1 year ago

Have you thought about how you want to handle these masks in conjunction with filter pushdown? I wonder if the best approach would be to expose a new hidden column __row_index in the scanner. Then you can pass down a filter (which can prune row groups or even data pages), and then the data returned with have the original row indices in that __row_index column. From there you could filter the results for where the __filename column matches the filename and __row_index matches doesn't match any of the deleted rows in the mask. What do you think of that?

westonpace commented 1 year ago

I was thinking of using the exec batch's index. However, I like the __row_index idea better since that wouldn't require any sequencing of batches so it should be more performant.

rdblue commented 1 year ago

The __row_index idea is what we do in the Java Iceberg implementation. That works well.