pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.23k stars 1.84k forks source link

Use BigQuery Dataframes as Read-Connector to BigQuery #17326

Open OELSJAN opened 2 months ago

OELSJAN commented 2 months ago

Description

BigQuery launched a feature named "BigQuery Dataframes":

BigQuery DataFrames provides a Pythonic DataFrame and machine learning (ML) API powered by the BigQuery engine.

With this you can execute Pandas operations directly on BigQuery engine. So maybe this API can be used to implement a better connector towards BigQuery, which also supports some lazy optimizations like filter pushdown, instead of using from_arrow with a hardcoded query executed by the Python BigQuery client.

tswast commented 2 weeks ago

Thanks @OELSJAN for the feature request! I'm curious to hear if there's anything we can do in BigQuery DataFrames to make these filter pushdown features easier to implement.

Also, for your awareness, we have on our backlog a request to make an official polars connector for BigQuery (watch issue https://github.com/googleapis/python-bigquery/issues/1979 for updates). I suspect that might be a good place to implement such optimizations, as a separate package could make dependencies a little easier to manage.

See also this request on the BigQuery DataFrames repo for some polars support. https://github.com/googleapis/python-bigquery-dataframes/issues/735, which is mostly focused on the I/O piece similar to this request.

tswast commented 1 week ago

A question: are there other I/O methods that support push-down to the storage layer? I'm curious what hooks are available for such functionality.

Edit: Two reasons for asking: (1) it'd be lovely to hook into the existing optimizations somehow via some extension mechanism (note that much of these, such as row filters and column filters are supported via the BQ Storage Read API) and (2) it'd be great to introduce even more pushdown types, as BigQuery DataFrames supports aggregations, joins (to other BigQuery data sources or even local data if uploaded to a temp table or small enough to inline in SQL), and more.

alexander-beedie commented 1 week ago

A question: are there other I/O methods that support push-down to the storage layer? I'm curious what hooks are available for such functionality.

@tswast: Yes - for example, the Polars Iceberg integration[^1] supports various pushdown optimisations including predicates, range queries, and suchlike 👍

On a side-note, I've been meaning to look at integrating the BigQuery Client object as a valid connection type for our pl.read_database function for a while... should really get around to it!

[^1]: feat(python): Add support for Iceberg (https://github.com/pola-rs/polars/pull/10375)

tswast commented 1 week ago

Yes - for example, the Polars Iceberg integration

Very cool!

From what I've heard from folks (e.g. https://github.com/googleapis/python-bigquery/issues/1979), it's important to avoid unnecessary dependencies on pyarrow, so it's good to see that _scan_python_function has an option to avoid that dependency.

How stable is this interface? I'm curious if this sort of connector is best contributed directly to the polars package or should be provided by a separate package (similar to how pandas has refactored BigQuery support out into pandas-gbq years ago).

Since this proposed feature is to go beyond predicates to potentially turning aggregates and such into BigQuery queries, maybe it's best to stick to the pola-rs/polar repo for now, as that scan functionality improves/extends?

tswast commented 1 week ago

I've started a BigQuery + polars gist with some ideas. I'll try to keep that up to date as I experiment with reads and writes. The first experiment, bigquery-to-polars-no-pyarrow-ipynb, is a barebones read API that doesn't require pyarrow to go from BigQuery table -> polars DataFrame.

This could be extended further to support a scan_bigquery with no extra dependencies (such as bigframes), as the BigQuery Storage Read API supports predicates (see row_restriction in https://cloud.google.com/python/docs/reference/bigquerystorage/latest/google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions). Maybe I'll give that a try soon, as it sounds like a fun project. :-)

Edit: Note that to make this work for queries could go a few ways. It's complicated because we're dealing with multiple APIs: BigQuery REST API for queries and BigQuery Storage Read API for tables:

  1. forget trying to avoid pyarrow and use the query results to_arrow() method,
  2. make sure we always use the "jobs.insert" REST API by calling query(), not query_and_wait(), pull out the destination table [which is always created for query jobs, but not necessarily with query_and_wait, then use the BQ Storage Read API as in my gist
  3. Use query_and_wait but then somehow switch between the REST API to download results as Python objects (should only do this for small results) or see if there is a destination table and us that with the BQ Storage Read API.

That said, maybe query support is not necessary, since we will have read_database compatibility and want a scan_bigquery API with predicate push down support? If scan_bigquery pushes down more than just predicates though, such as aggregates to the query API, then we would need to solve this.