Eventual-Inc / Daft

Distributed DataFrame for Python designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
1.82k stars 113 forks source link

How to filter by bucket in bucket partitioned Iceberg table #2105

Open rakeshJn opened 2 months ago

rakeshJn commented 2 months ago

This may not be a bug, perhaps a question.

I have an Iceberg table with bucket partition. For example one I added like this:

Alter table {table} add partition field bucket(16, document_id) AS bucket

Now to take advantage of Iceberg's partition pruning, I want to filter by bucket number. For example:

df = daft.read_iceberg(table)
df = df.where('bucket==0')

What is the right way of doing such a filtering? Thanks.

samster25 commented 2 months ago

Hi @rakeshJn, thanks for making an issue!

Daft will automatically translate the column on the table to the bucket!

For Example:

df = daft.read_iceberg(table)
df = df.where(df['document_id'] == 42)
df.explain(show_all=True) # to verify partition predicate has been pushed down.

This will compute the hash bucket for 42 and then apply it for partition pruning. You can verify this is the case with our explain method.

rakeshJn commented 2 months ago

Thanks @samster25 for quick reply, how does it know whether I am looking to filter by literally a document_id of value 42 or hash bucket of document_id, because indeed document_id is a column in the table. Besides, my document_id is a string and it is actually not computing hash and giving me an error:

DaftError::TypeError Cannot perform comparison on Utf8 and numeric type.
types: Utf8, Int32

Which means it is trying to filter by value 42.

samster25 commented 2 months ago

Hi @rakeshJn, since Iceberg has hidden partitioning the column bucket is actually hidden from us. However when we apply predicates via a filter, Daft will check if there is any partition filters it can apply. It does this by keeping a mapping of table columns to partition columns.

so if document_id is a string, we should be able to run df = df.where(df['document_id'] == "42") and daft will see that document_id is a column that is partitioned and then will push down the filter to the scan. We can verify this with the explain method.

In the case of hash bucketing, both == and != are valid.

rakeshJn commented 2 months ago

Ok, I see what you are saying @samster25 . That works when I want to filter by specific record, for eg document_id="42". What I want is something different. My table is bucket partitioned by 16 buckets, on document_id. That means I have 16 partitions on the table with roughly equal number of records in each partition, let's say 1.5M records in each partition.

Now, I want to query/filter this table by specific bucket, i.e., give me all 1.5M records from bucket 0. Is that something I can achieve using daft?

For reference, here's one related to Spark: https://github.com/apache/iceberg/pull/7886

samster25 commented 2 months ago

I see what you mean! So you could run the expression on the dataframe which will give you the filtered rows. However this won't prune out the files until we push down that expression.

df = daft.read_iceberg(table)
df = df.where(df['document_id'].partitioning.iceberg_bucket(16) == 0)

I'll make an issue so that we can push this down!