GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
371 stars 194 forks source link

exception : BigQueryException: Provided Schema does not match Table #1294

Open VIKCT001 opened 3 days ago

VIKCT001 commented 3 days ago

we are using bigquery connector to insert/update to the bigquery database. Recently we have been migrated to dataproc image version 2.29 debian 12 image. We have confingure the cluster with a Metadata parameter as Metadata: SPARK_BQ_CONNECTOR_VERSION 0.41.0

we have complied our application jar code with scala version 2.12.18 and spark version as 3.5.0 which seems to be compatible with given dataproc image version. (as per the documentation). our application code reads file into a dataframe and create a patitioned table and write data frame into it. This process was working fine with old dataproc image 2.0 and with the old bigquery connector.

With this new dataproc image and new connector version, we are facing a schema mismatch issue.

our application code first creates a partitioned table on timestamp column and then write the dataframe to it. That is the place wherein we see our code is failing.

below is the stacktrace

"stacktrace": [ "com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery", "\tat com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:157)", "\tat com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)", "\tat com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51)", "\tat com.google.cloud.spark.bigquery.v2.Spark31BigQueryTableProvider.createRelation(Spark31BigQueryTableProvider.java:74)", "\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)", "\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)", "\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)", "\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)", "\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)", "\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)", "\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)", "\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:473)", "\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)", "\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:473)", "\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)", "\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)", "\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)", "\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)", "\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)", "\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:449)", "\tat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)", "\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)", "\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)", "\tat org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)", "\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)", "\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)", "\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304)", "\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)", "\tat com.pythian.edp.pm.spark.io.BigQueryWriter.write(BigQueryWriter.scala:97)", "\tat com.pythian.edp.pm.spark.job.mixin.WriterSupport.write(WriterSupport.scala:15)", "\tat com.pythian.edp.pm.spark.job.mixin.WriterSupport.write$(WriterSupport.scala:11)", "\tat com.pythian.edp.pm.spark.job.DigiFactIngestionSparkJob.write(DigiFactIngestionSparkJob.scala:34)", "\tat com.pythian.edp.pm.spark.job.DigiFactIngestionSparkJob.writeDatFrame(DigiFactIngestionSparkJob.scala:290)", "\tat com.pythian.edp.pm.spark.job.DigiFactIngestionSparkJob.$anonfun$processInternal$16(DigiFactIngestionSparkJob.scala:170)", "\tat scala.Option.map(Option.scala:230)", "\tat com.pythian.edp.pm.spark.job.DigiFactIngestionSparkJob.processInternal(DigiFactIngestionSparkJob.scala:151)", "\tat com.pythian.edp.pm.spark.job.DigiFactIngestionSparkJob.process(DigiFactIngestionSparkJob.scala:56)", "\tat com.pythian.edp.pm.spark.job.DigiSparkJob$.main(DigiSparkJob.scala:158)", "\tat com.pythian.edp.pm.spark.job.DigiSparkJob.main(DigiSparkJob.scala)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)", "\tat org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)", "\tat org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1032)", "\tat org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)", "\tat org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)", "\tat org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)", "\tat org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1124)", "\tat org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1133)", "\tat org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)", "Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table bmas-eu-pm-dnb-uat-data:PM_INGEST.ERI_NR5G_NRCELLCU10. Cannot add fields (field: SITE)", "\tat com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.reload(Job.java:471)", "\tat com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitForInternal(Job.java:290)", "\tat com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:202)", "\tat com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:518)", "\tat com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:508)", "\tat com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:809)", "\tat com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.java:179)", "\tat com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:153)", "\t... 55 more" ],

VIKCT001 commented 2 days ago

if we add allowFieldAddition property to true. its working

  df.write
  .format("bigquery")
  .mode(SaveMode.Append)
  .option("table", s"${conf.location.project}.${conf.location.dataset}.${conf.location.table}")
  .option("project", conf.location.project)
  .option("allowFieldAddition", "True")
  .save()
davidrabinowitz commented 1 day ago

Can you please share the BigQuery table and the DataFrame schemas so we can check this?

VIKCT001 commented 1 day ago

override def write(df: DataFrame): Unit = { val params = mutable.Map(defaultOptions.++(conf.params).toSeq: _*) println(s"params:$params") val temporaryGcsBucketName = params.get("temporaryGcsBucket").get.toString params.foldLeft( df.write ) { case (dfw, (name, value)) => value match { case v: Boolean => dfw.option(name, v) case v: Double => dfw.option(name, v) case v: Long => dfw.option(name, v) case v: String => dfw.option(name, v) } } .format("bigquery") .mode(SaveMode.Append) .option("table", s"${conf.location.project}.${conf.location.dataset}.${conf.location.table}") .option("project", conf.location.project) .option("allowFieldAddition", "True") .save() } }

here we are passing bucket name in params

params:Map(temporaryGcsBucket -> test-bucket, project -> test-bucket)

our code first creates a partitioned table in bigquery by adding two columns to it and then later we are writing dataframe to it. our application code was working fine till dataproc image 2 version but now we have to add allowFieldAddition parameter to make it work.

our application code is complied with scala 2.12.18 and spark 3.5 version and we are executing our code on 2.2.29-debian12 image.