GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
358 stars 189 forks source link

BigQuery Pushdown filtering on Spark 3.4.2 #1207

Open sid-habu opened 3 months ago

sid-habu commented 3 months ago

I have a Big Query table foo with a DATE column bar_date. I am trying to query this table in Spark 3.4.2 using the spark-bigquery-with-dependencies:0.30.0 connector

I am unable to get the pushdown filtering to work as the physical plan shows PushedFilters: [] and pulls in all the data from BQ before doing the filtering in Spark

Below is my code. I even tried enabling BigQueryConnectorUtils.enablePushdownSession(spark) but found that it isn't supported yet for Spark 3.4+

// Enabling pushdown session for BigQuery in Spark 3.4.2 throws an error
// https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/1000

//BigQueryConnectorUtils.enablePushdownSession(spark)

spark.sqlContext.read
  ......
  .option("table", "foo")
  .option("viewsEnabled", "true")
  .load()
  .filter(col("bar_date") >= 2024-01-01 && col("bar_date") <= 2024-03-31)

Physical plan after stripping the table name and requireColumns. The filter list is empty in the plan as-is

DirectBigQueryRelation: |Querying table ......, parameters sent from Spark:|requiredColumns=[.......],|filters=[]

I am sure I am missing something trivial as I expect this simple filtering to be pushed down to BigQuery.

isha97 commented 3 months ago

@sid-habu Currently, pushdown is only supported till spark 3.3.

sid-habu commented 3 months ago

@sid-habu Currently, pushdown is only supported till spark 3.3.

@isha97 In that case, can you please confirm if I use a workaround by passing in a raw query, it will execute the filtering in BigQuery

spark.sqlContext.read
  ......
  .option("viewsEnabled", "true")
  .option("query", "select x,y from foo where bar_date >= '2024-01-01' && bar_date <= '2024-03-31'")
  .load()
tom-s-powell commented 2 months ago

@isha97 is is true filters aren't supported in Spark 3.4+? I realise BigQueryConnectorUtils.enablePushdownSession doesn't work, but with Spark 3.4.1 and a Spark filter applied I see DirectBigQueryRelation receives a compiled filter that is passed as a rowRestriction in ReadSession.TableReadOptions in ReadSessionCreator.

However, when I look in the BigQuery console and at the project history, I see a SELECT * FROM <table-name> query being executed, not one that has the filters. I don't know whether this is standard behaviour of the BigQuery Storage Read API though?

This is when using spark-bigquery-with-dependencies_2.12.

davidrabinowitz commented 2 months ago

@tom-s-powell Filter pushdown are enabled by default in all the connector flavors, and cannot be disabled. You can track the usage of filters in the application log - both the filters the connector gets from Spark and the compiled filter are logged under the INFO level. Notice that you cannot track the connector usage by looking at the jobs at the project history as there are no jobs - the data is being read directly from the table using the BigQuery Storage Read API using the mentioned ReadSession. Query jobs are created on two occasions only - when reading from views, and when reading from queries (spark.load.format("bigquery").load("SELECT ...")).

Regarding BigQueryConnectorUtils.enablePushdownSession() - this belongs to the unreleased feature of query pushdown. A more aptly name for the method would be enableQueryPushdownSession() and it will be changed soon.

tom-s-powell commented 2 months ago

I see thank you. So in the case of load("SELECT ...") or .option("query", "SELECT ...").load(), how does filter pushdown work? Are filters not pushed down to create the temp tables, but are when consuming from the temporary tables?

One use-case we have for using query is for time travel, something like SELECT * FROM <table-name> FOR SYSTEM_TIME AS OF TIMESTAMP_MILLIS(%s). In this instance, there is no pushdown? Filter/column push-down on the result?

EDIT: One other question would be around limits, and if that is a pushdown capability?

davidrabinowitz commented 2 months ago

When loading from query (load("SELECT ...") or .option("query", "SELECT ...").load()) the connector creates a BigQuery query job and executes the query, as given. Further reads from the data frame are read from the temporary table that materializes the data, and additional filters are pushed in the ReadSession if needed.

In the SELECT * FROM <table-name> FOR SYSTEM_TIME AS OF TIMESTAMP_MILLIS(%s) there are no filters or column projection, therefore the created results table is an exact snapshot of the table in the given timestamp. Any reads from the dataframe should push both filters and column projection in the ReadSession aimed at the temporary table.

If the question is whether we can push down df.head(20) as the equivalent of SELECT ... LIMIT 20 then the answer is not yet, as the BigQuery Storage Read API does not provide this capability at the moment. A workaround is to run load("SELECT ... LIMIT ...") but this solution is less flexible as the retrieved rows are fixed.

tom-s-powell commented 2 months ago

Thanks for the explanation. And there's no way of using the BigQuery Storage Read API to query time travel without creating a temporary table? I'm assuming there's cost associated with that.

The other case we have is for partitioned tables. We have had reports that partitions are not pruned, but I assumed that is because we are using query and thus filters aren't pushed to the job creating the temporary table. Is there a solution here, or would the filter need to be included in the query?

parthshyara commented 1 month ago

this belongs to the unreleased feature of query pushdown

@davidrabinowitz @isha97 Is there a timeline when this feature will be released for spark 3.5? We actually have a strong use-case for this requirement and without this, we're incurring huge unnecessary costs. I've already seen a couple pending / closed issues related to this feature request.

parthshyara commented 1 month ago

An additional question related to the previous comment. Is the pushdown only supported on tables and not views, ie. if viewsEnabled is set to True, will the pushdown not applied? (For the purpose of this question, assume spark-3.3-bigquery is used which has pushdown implemented)

parthshyara commented 1 month ago

@davidrabinowitz Following up on the previous questions.