GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

Unable to write to a partitioned table in spark 3.5 with connector version spark-3.4-bigquery-0.34.0 #1228

Closed hierr closed 5 months ago

hierr commented 6 months ago

I am getting the following error:

com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery

Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Invalid table ID "tb_rh_n1$202405166494015418172". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used.

Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request

When trying to write to the following table:

image image image

With the following code in spark 3.5 with connector version spark-3.4-bigquery-0.34.0:

image

isha97 commented 6 months ago

Hi @hierr ,

I am unable to reproduce this issue, can you please provide the full stack trace? I will also suggest to use Dynamic Partition Overwrite if you want to only overwrite the partitions which are contained in df_base Please check for the usage of spark.sql.sources.partitionOverwriteMode config in the Readme.

hierr commented 6 months ago

Hi @isha97,

I am already running with only partitionOverwriteMode = DYNAMIC and the temporaryGcsBucket set since I am running Spark on Dataproc (image version 2.2.15-debian12 which is compatible with Spark 3.5 and the 0.34.0 connector). Here is the full stack trace:

Py4JJavaError: An error occurred while calling o108.save. : com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:147) at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43) at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54) at com.google.cloud.spark.bigquery.v2.Spark31BigQueryTableProvider.createRelation(Spark31BigQueryTableProvider.java:65) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:473) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:473) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:449) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Invalid table ID "tb_rh_n1$202405161868823060940". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used. at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:114) at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.create(HttpBigQueryRpc.java:186) at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$2.call(BigQueryImpl.java:304) at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$2.call(BigQueryImpl.java:301) at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103) at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76) at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:300) at com.google.cloud.bigquery.connector.common.BigQueryClient.createTempTable(BigQueryClient.java:206) at com.google.cloud.bigquery.connector.common.BigQueryClient.createTempTableAfterCheckingSchema(BigQueryClient.java:224) at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:132) ... 44 more Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request

I've found it to be a very weird error (i was losing my mind over here) since I've been doing this partitioned write with no issues for a long time, even with this same configuration of dataproc+spark+connector.

isha97 commented 6 months ago

@hierr If you are using partitionOverwriteMode as DYNAMIC, you don't need to set the other options such as "partitionType", "partitionField" and "datePartition" It will automatically overwrite the required partitions.

Can you also please grep the logs for "tb_rh_n1$202405161868823060940"? Do you see any log for "Deleting table"?

As you mentioned that this error is only recent and was working as expected previously, did something change?

hierr commented 6 months ago

Well it's actually news to me that this got implemented, but it works, so good news! By "been doing this write" I meant that I've been doing like this in general, but not in this particular cluster.

If i take the config off and set the datePartition it keeps throwing the error though (just as a test). There are no logs for that table in particular, and neither for the one that I just got the error. Do you understand why is it trying to write to a table ID in this format? It seems weird to me since the table is day partitioned and also the error calls that "table decorators cannot be used".

isha97 commented 5 months ago

@hierr we create a temporary table to write to a specific partition but it should be just tb_rh_n1$20240516, which is what I got when I tried to reproduce. I am not able to reproduce the generation of the whole table name tb_rh_n1$202405166494015418172.

From your previous comment, are you unblocked now by using partitionOverwriteMode as DYNAMIC?

hierr commented 5 months ago

@isha97 Yeah, I don't see a reason for that since the code specifies '20240516' in datePartition. Tbh this cluster is behaving weirdly, throwing random errors and recovering later, so there is that.

But yes, I am unblocked now, so closing this. Many thanks!