GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

Unable to write to BigQuery table with require_partition_filter = true when spark.sql.sources.partitionOverwriteMode is set to DYNAMIC #1285

Closed ktchana closed 1 month ago

ktchana commented 2 months ago

A Bigquery table is created like this:

CREATE TABLE `spark_test.mytable`
(
  id INT64 NOT NULL,
  name STRING,
  txndate DATE NOT NULL
)
PARTITION BY txndate;

The following Scala spark code is used to populate the table with dummy data:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import java.sql.Date

val data = Seq(
  Row(1, "Alice", Date.valueOf("2023-03-01")),
  Row(2, "Bob", Date.valueOf("2023-03-02")),
  Row(3, "Charlie", Date.valueOf("2023-03-03"))
)

val schema = StructType(List(
  StructField("id", IntegerType, nullable = false),
  StructField("name", StringType),
  StructField("txndate", DateType, nullable = false)
))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

df.write
  .format("bigquery")
  .option("table", "spark_test.mytable")
  .option("temporaryGcsBucket", "my-temp-gcs-123")
  .option("writeMethod", "indirect")
  .option("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
  .mode("overwrite")
  .save()

This works as expected (i.e., table partitions included in the Dataframe are overwritten). However, when we alter the table to enable the require_partition_filter option:

ALTER TABLE spark_test.mytable
  SET OPTIONS (
    require_partition_filter = true);

The same df.write call would fail complaining about the lack of partition filter in the query:

scala>   .save()
com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
  at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:157)
  at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
  at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54)
  at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
  at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
  ... 53 elided
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:116)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:745)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1500)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1495)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:102)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1494)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1478)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job$1.call(Job.java:390)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job$1.call(Job.java:387)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:102)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitForQueryResults(Job.java:386)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitForInternal(Job.java:281)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:202)
  at com.google.cloud.bigquery.connector.common.BigQueryClient.waitForJob(BigQueryClient.java:131)
  at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:151)
  ... 77 more
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
GET https://bigquery.googleapis.com/bigquery/v2/projects/my-spark-bigquery-demo/queries/c80fb709-7aa0-4e1c-8fae-4e806bcfc83a?location=europe-west2&maxResults=0&prettyPrint=false
{
  "code": 400,
  "errors": [
    {
      "domain": "global",
      "location": "q",
      "locationType": "parameter",
      "message": "Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]",
      "reason": "invalidQuery"
    }
  ],
  "message": "Query error: Cannot query over table 'spark_test.mytable' without a filter over column(s) 'txndate' that can be used for partition elimination at [2:1]",
  "status": "INVALID_ARGUMENT"
}
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$3.interceptResponse(AbstractGoogleClientRequest.java:479)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:565)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:506)
  at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:616)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:743)
  ... 94 more

scala> 

Tested with the following

Spark 3.1.3 on Dataproc Scala 2.12 BigQuery Connector: spark-bigquery-with-dependencies_2.12-0.40.0.jar

ktchana commented 2 months ago

Looking at the Bigquery query logs, it looks like the connector is trying to execute the following code block:

DECLARE
  partitions_to_delete DEFAULT (
  SELECT
    ARRAY_AGG(DISTINCT(TIMESTAMP_TRUNC(`txndate`, DAY)) IGNORE NULLS)
  FROM
    `spark_test.mytable2316280012714`);

MERGE
  `spark_test.mytable` AS TARGET
USING
  `spark_test.mytable2316280012714` AS SOURCE
ON
  FALSE
  WHEN NOT MATCHED BY SOURCE AND TIMESTAMP_TRUNC(`target`.`txndate`, DAY) IN UNNEST(partitions_to_delete) THEN DELETE
  WHEN NOT MATCHED BY TARGET
  THEN
INSERT
  (`id`,
    `name`,
    `txndate`)
VALUES
  (`id`,`name`,`txndate`);

in which the partitioned column txndate is referenced with a call to the TIMESTAMP_TRUNC function. I believe this is what makes the BQ optimizer reject the query.

A potential workaround for date partitioned table would be to change the above block into something like this:

DECLARE
  partitions_to_delete DEFAULT (
  SELECT
    ARRAY_AGG(DISTINCT(DATE(TIMESTAMP_TRUNC(`txndate`, DAY))) IGNORE NULLS)
  FROM
    `spark_test.mytable2316280012714`);

MERGE
  `spark_test.mytable` AS TARGET
USING
  `spark_test.mytable2316280012714` AS SOURCE
ON
  FALSE
  WHEN NOT MATCHED BY SOURCE AND `target`.`txndate` IN UNNEST(partitions_to_delete) THEN DELETE
  WHEN NOT MATCHED BY TARGET
  THEN
INSERT
  (`id`,
    `name`,
    `txndate`)
VALUES
  (`id`,`name`,`txndate`);

Any chance this could be fixed in the next connector version?

isha97 commented 1 month ago

Fixed as part of https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1310