GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

Add support for BigQuery tables with Collation #1279

Closed jster1357 closed 3 months ago

jster1357 commented 3 months ago

The spark connector for BigQuery has issues handling tables with collation enabled. Collation has been around since March 2023.

Testing was done with dataset level collation enabled.

When going against the base table using the direct, non-sql method it was successful. So this is the only known workaround at this time.

The follow stack trace is generated in the following scenarios:

Py4JJavaError: An error occurred while calling o88.showString.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Error creating destination table using the following query: [SELECT `a`,`b`,`c` FROM `generalproject-340815.test_no_collation.t1_vw` ]
    at com.google.cloud.bigquery.connector.common.BigQueryClient.materializeTable(BigQueryClient.java:491)
    at com.google.cloud.bigquery.connector.common.BigQueryClient.materializeViewToTable(BigQueryClient.java:473)
    at com.google.cloud.bigquery.connector.common.ReadSessionCreator.getActualTable(ReadSessionCreator.java:182)
    at com.google.cloud.bigquery.connector.common.ReadSessionCreator.getActualTable(ReadSessionCreator.java:168)
    at com.google.cloud.bigquery.connector.common.ReadSessionCreator.create(ReadSessionCreator.java:72)
    at com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory.createRddFromTable(BigQueryRDDFactory.java:134)
    at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.java:130)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:362)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:396)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:475)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:395)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:362)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:461)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table generalproject-340815:test_no_collation._bqc_163ff1a4afe044a1b1cbc3b9c314847a. Invalid schema update. Updating field collation is not supported. Field a has changed collation from "und:ci" to ""
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
    at com.google.cloud.bigquery.connector.common.BigQueryClient.materializeTable(BigQueryClient.java:479)
    ... 75 more
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table generalproject-340815:test_no_collation._bqc_163ff1a4afe044a1b1cbc3b9c314847a. Invalid schema update. Updating field collation is not supported. Field a has changed collation from "und:ci" to ""
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.patch(HttpBigQueryRpc.java:284)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$15.call(BigQueryImpl.java:690)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$15.call(BigQueryImpl.java:687)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.update(BigQueryImpl.java:686)
    at com.google.cloud.bigquery.connector.common.BigQueryClient.update(BigQueryClient.java:319)
    at com.google.cloud.bigquery.connector.common.BigQueryClient$DestinationTableBuilder.createTableFromQuery(BigQueryClient.java:685)
    at com.google.cloud.bigquery.connector.common.BigQueryClient$DestinationTableBuilder.call(BigQueryClient.java:662)
    at com.google.cloud.bigquery.connector.common.BigQueryClient$DestinationTableBuilder.call(BigQueryClient.java:637)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
    ... 78 more
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
POST https://www.googleapis.com/bigquery/v2/projects/generalproject-340815/datasets/test_no_collation/tables/_bqc_163ff1a4afe044a1b1cbc3b9c314847a?prettyPrint=false
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "Provided Schema does not match Table generalproject-340815:test_no_collation._bqc_163ff1a4afe044a1b1cbc3b9c314847a. Invalid schema update. Updating field collation is not supported. Field a has changed collation from \"und:ci\" to \"\"",
    "reason" : "invalid"
  } ],
  "message" : "Provided Schema does not match Table generalproject-340815:test_no_collation._bqc_163ff1a4afe044a1b1cbc3b9c314847a. Invalid schema update. Updating field collation is not supported. Field a has changed collation from \"und:ci\" to \"\"",
  "status" : "INVALID_ARGUMENT"
}
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:439)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.patch(HttpBigQueryRpc.java:282)
    ... 93 more
davidrabinowitz commented 3 months ago

A few questions:

davidrabinowitz commented 3 months ago

Based on a offline chat, this works with the latest version.