GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

error while writing with many timestamp data #1223

Closed iqbalaey closed 6 months ago

iqbalaey commented 6 months ago

env: dataproc 2.0 pyspark 3.1/3.5 spark-3.1-bigquery-0.37.0.jar/spark-3.5-bigquery-0.37.0.jar

Dataframe Schema

|-- activation_dtm: timestamp (nullable = true) |-- termination_dtm: timestamp (nullable = true) |-- active_end_dtm: timestamp (nullable = true) |-- first_usage_dt: timestamp (nullable = true) |-- last_usage_dt: timestamp (nullable = true) |-- first_spcl_topup_dt: timestamp (nullable = true) |-- first_reg_topup_dt: timestamp (nullable = true) |-- last_spcl_topup_dt: timestamp (nullable = true) |-- last_reg_topup_dt: timestamp (nullable = true) |-- redemption_dtm: timestamp (nullable = true) |-- last_topup_special_date: timestamp (nullable = true) |-- last_topup_regular_date: timestamp (nullable = true) |-- last_topup_special_voice_date: timestamp (nullable = true) |-- last_topup_special_sms_date: timestamp (nullable = true) |-- last_topup_special_data_date: timestamp (nullable = true) |-- first_voice_paid_usage_date: timestamp (nullable = true) |-- first_voice_free_usage_date: timestamp (nullable = true) |-- first_sms_paid_usage_date: timestamp (nullable = true) |-- first_sms_free_usage_date: timestamp (nullable = true) |-- any_event_first_usage_date: timestamp (nullable = true) |-- first_ott_usage_dt: timestamp (nullable = true) |-- last_billing_usage_dt_sk_id: date (nullable = true)

BQ Schema

activation_dtm TIMESTAMP termination_dtm TIMESTAMP active_end_dtm TIMESTAMP first_usage_dt TIMESTAMP last_usage_dt TIMESTAMP first_spcl_topup_dt TIMESTAMP first_reg_topup_dt TIMESTAMP last_spcl_topup_dt TIMESTAMP last_reg_topup_dt TIMESTAMP redemption_dtm TIMESTAMP last_topup_special_date TIMESTAMP last_topup_regular_date TIMESTAMP last_topup_special_voice_date TIMESTAMP last_topup_special_sms_date TIMESTAMP last_topup_special_data_date TIMESTAMP first_voice_paid_usage_date TIMESTAMP first_voice_free_usage_date TIMESTAMP first_sms_paid_usage_date TIMESTAMP first_sms_free_usage_date TIMESTAMP any_event_first_usage_date TIMESTAMP first_ott_usage_dt TIMESTAMP last_billing_usage_dt_sk_id DATE => partition column

Got this error when try to direct write BQ from read jdbc

ERROR TaskSetManager: task 1.0 in stage 8.0 (TID 22) had a not serializable result: com.google.cloud.spark.bigquery.repackaged.io.grpc.Status Serialization stack:

  • object not serializable (class: com.google.cloud.spark.bigquery.repackaged.io.grpc.Status, value: Status{code=INVALID_ARGUMENT, description=Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: myentity/streams/Cig2Njk3MjFiYy0wMDAwLTI2OWMtOWZmMi0zYzI4NmQ0MzFkZTY6czEz, cause=null})
  • writeObject data (class: java.lang.Throwable)
  • object (class com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError, com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: myentity/streams/Cig2Njk3MjFiYy0wMDAwLTI2OWMtOWZmMi0zYzI4NmQ0MzFkZTY6czEz)
  • writeObject data (class: java.lang.Throwable)
  • object (class java.util.concurrent.ExecutionException, java.util.concurrent.ExecutionException: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: entity/streams/Cig2Njk3MjFiYy0wMDAwLTI2OWMtOWZmMi0zYzI4NmQ0MzFkZTY6czEz)
  • writeObject data (class: java.lang.Throwable)
  • object (class com.google.cloud.bigquery.connector.common.BigQueryConnectorException, com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Execution Exception while retrieving AppendRowsResponse)
  • field (class: com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1, name: val$e, type: class java.lang.Exception)
  • object (class com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1, com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1@5cb4b82d)
  • element of array (index: 0)
  • array (class [Ljava.lang.Object;, size 1); not retrying An error occurred while calling o338.save. : com.google.cloud.bigquery.connector.common.BigQueryConnectorException: unexpected issue trying to save [sbscrptn_sk_id: bigint, sbscrptn_ek_id: bigint ... 80 more fields] at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:128) at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51) at com.google.cloud.spark.bigquery.v2.Spark31BigQueryTableProvider.createRelation(Spark31BigQueryTableProvider.java:65) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: task 1.0 in stage 8.0 (TID 22) had a not serializable result: com.google.cloud.spark.bigquery.repackaged.io.grpc.Status