duckdb / duckdb_iceberg

MIT License
160 stars 24 forks source link

Predicate Pushdown for scans #2

Open rustyconover opened 1 year ago

rustyconover commented 1 year ago

It currently appears that predicate pushdown isn't applied before calculating what Parquet files are included in scanning the table's data.

The AVRO manifest files include statistics about each Parquet file. Those should be leveraged to reduce the number of files that need to be read. Additionally there is the byte offset for each row group in each Parquet file. That can prevent the need to read the footer of the Parquet field and allow better concurrent scanning.

samansmink commented 1 year ago

@rustyconover Hey thanks for all the input! All your submitted issues are all very valid points and are definitely on our to-do list in getting this extension beyond its initial proof-of-concept state!

rustyconover commented 1 year ago

Thanks Sam I look forward to contributing

On Mon, Jul 17, 2023 at 09:22 Sam Ansmink @.***> wrote:

@rustyconover https://github.com/rustyconover Hey thanks for all the input! All your submitted issues are all very valid points and are definitely on our to-do list in getting this extension beyond its initial proof-of-concept state!

— Reply to this email directly, view it on GitHub https://github.com/duckdblabs/duckdb_iceberg/issues/2#issuecomment-1638134998, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFSWJJXOMZXPPBO26U2J73XQU4CZANCNFSM6AAAAAA2BHSCTE . You are receiving this because you were mentioned.Message ID: @.***>

Fokko commented 1 year ago

How we do this on Iceberg (both Java and Python) is using visitors.

image

When a query is executed against a snapshot, that has a reference to a ManifestList, which references one or more Manifests. The manifests contain information that can be used to prune datafiles.

dev-goyal commented 10 months ago

Hey guys! Just wanted to check in and see if there might be updates on this - i.e., is this still prioritized? I wonder if it might be possible to have an intermediate solution in the meantime, where similar to read_parquet(), a list of files (both data and delete, obtained using PyIceberg say) may be passed to iceberg_scan(), greatly reducing the complexity of the scan.

rustyconover commented 7 months ago

@Fokko can you please update your links to the existing Java and Python implementations since the references have moved when the Iceberg repos were split out?

Fokko commented 7 months ago

@rustyconover Certainly, updated!

rustyconover commented 7 months ago

Hi @samansmink and @Fokko,

I spent a few hours looking at this extension, DuckDB itself and the Iceberg spec to implement a plan for this. Right now, I want to query Iceberg tables that consist of 100,000+ of parquet files that are highly partitioned, but without DuckDB supporting partitions and data file metrics it will be impossible. So I'm trying to figure out how to make this happen.

Here is what I've noticed:

TableFunction's bind_replace prevents access to query filters.

The use of the bind_replace function for iceberg_scan(), means that the init calls that contain the filter information for the query are never called. The bind_replace function currently read the Iceberg metadata then substitutes itself in the query plan with a call to parquet_scan() or a sub query that applies the delete file and still calls parquet_scan().

I don't see a way to get access to the DuckDB query filters since they are passed in the "init" phase of a TableFunction rather than the bind stage. Once the function has been changed to a parquet_scan(), it appears the init function of the iceberg_scan() is never called.

@samansmink What do you think is a reasonable approach to enable iceberg_scan() to enable access to the DuckDB query filter's structure? If we have access to the requested filters at the same time we are reading the manifest and determining the list of Parquet files to scan, we could keep using parquet_scan(). @Mytherin did I overlook the ability to obtain the query filters in the context of the bind function of a TableFunction?

A thought I had would be populating the metrics from the Iceberg manifests into the existing statistics infrastructure of parquet_scan(), but that would require a lot of reworking and could lead to a lot of memory usage.

Manifest metrics and partition information

It appears the manifest metrics and partition information aren't being parsed from the Avro files. This seems to just require some work to the generated code for parsing the Avro files. This can happen in parallel to the other improvements.

rustyconover commented 7 months ago

Seems like some progress for the necessary DuckDB infrastructure is being started here: https://github.com/duckdb/duckdb/pull/11806

peterboncz commented 6 months ago

@rustyconover: AFAIK the filter-pushdown with iceberg gets triggered.

Note pushdown happens

And note that immediately after bind_replace, the generated replaced piece of query plan is bound. So then, hive-partitioned file pruning based on pushdown happens.

However, when iceberg support was initially released, the second mechanism (in the query optimization) did suffer from absence of a pushdown rewrite-rule for ANTI-joins. Note that bind_replace generates an anti-join between the main data files and the files with deleted row-ids. But, late 2023 these rules were implemented, so this should now work.

rustyconover commented 6 months ago

Hi @peterboncz,

Thanks for replying.

The SQL query filter pushdown does get passed to the parquet_scan() nodes created by iceberg_scan(). To apply the query filters, DuckDB reads the footers from each Parquet file.

When an Iceberg table has millions of files, reading all these footers takes a long time. Iceberg addressed this issue by adding column metrics in the manifest files and detailed "Scan Planning" in their spec:

https://iceberg.apache.org/spec/#scan-planning

I'm looking forward to DuckDB using Iceberg's statistics to plan the scan of the table, right now the current code includes every file in the manifest. By reading the manifest files and using their info to plan a scan, scanning an Iceberg table with a million files can be reduced to scanning just 1 or 2 files especially if the query is referencing a small number of partition predicates.

Fokko commented 5 months ago

Thanks for chiming in here @peterboncz. I believe @rustyconover is right here. https://github.com/duckdb/duckdb_iceberg/issues/8 focuses on the partitioning (called hidden-partitioning in Iceberg that's slightly different than Hive-style partitioning).

Next to that, Iceberg had on a file-level basis additional statistics. For example, there are upper- and lower-bounds to easily skip files without having to read the footer of the Parquet file. But I don't see the upper_bound field being referenced in the code: https://github.com/search?q=repo%3Aduckdb%2Fduckdb_iceberg+upper_bound&type=code

JichaoS commented 4 months ago

Hey @rustyconover it looks like the blocker you mentioned: https://github.com/duckdb/duckdb/pull/11806 got merged. Do you think the current status ready for implementation?

febinsathar commented 1 month ago

@rustyconover are you planning to pick this up? thanks

rustyconover commented 1 month ago

Not in the short term.

mike-luabase commented 1 month ago

@febinsathar I have a version partially working here