GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

Internal Error when writing Pyspark df to partitioned Bigquery table (Dataproc) #1300

Closed lorenh516 closed 1 month ago

lorenh516 commented 1 month ago

I've recently been running into an issue when I try to write a PySpark df to an existing, partitioned BigQuery table via Dataproc. I'm getting an internal error from Spark related to a java.lang.NullPointerException.

py4j.protocol.Py4JJavaError: An error occurred while calling o913.save.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: unexpected issue trying to save [period_start: date, period_end: date ... 20 more fields]
    at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:128)
...
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
    at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
...
    at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:70)
    ... 43 more
Caused by: java.lang.NullPointerException
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.SingletonImmutableList.<init>(SingletonImmutableList.java:41)

The error only seems to occur when I am writing a ~table~ df that is the result of a join. I've verified that I don't have empty values in the columns that are required in the BQ table. How do I get around this error to write the table to BigQuery?

This is the failing call:

(
    df.write.format("bigquery")
    .partitionBy("period_start")
    .option("writeMethod", "direct")
    .option("table", bigquery_destination_table)
    .option("createDisposition", "CREATE_IF_NEEDED")
    .mode("overwrite")
    .save()
)

I initialized the spark session with:

spark = (
SparkSession.builder.appName("my_app")
.config(
    "spark.jars",
    ",".join([
        "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
        "https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.1.jar",
    ]),)
.getOrCreate()
)
spark.conf.set("temporaryGcsBucket", "my-bucket")
spark.conf.set("viewsEnabled", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
spark.sparkContext.setLogLevel("WARN")

Full stack trace below:

Using the default container image
Waiting for container log creation
PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
Generating /home/spark/.pip/pip.conf
Configuring index-url as 'https://us-python.pkg.dev/artifact-registry-python-cache/virtual-python/simple/'
JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
SPARK_EXTRA_CLASSPATH=
:: loading settings :: file = /etc/spark/conf/ivysettings.xml
24/10/01 16:44:29 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-google-hadoop-file-system.properties,hadoop-metrics2.properties
24/10/01 16:44:29 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
24/10/01 16:44:29 INFO MetricsSystemImpl: google-hadoop-file-system metrics system started
24/10/01 16:44:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/10/01 16:49:29 WARN BigQueryDirectDataSourceWriterContext: BigQuery Data Source writer 9d5a4a2f-38ae-4597-981f-abbdca72bfc8 aborted
Traceback (most recent call last):
  File "/var/dataproc/tmp/srvls-batch-46a3af8a-1da1-4099-8aa7-b0b3d610dab8/my_file.py", line 484, in <module>
    .save()
     ^^^^^^
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o913.save.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: unexpected issue trying to save [period_start: date, period_end: date ... 20 more fields]
    at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:128)
    at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54)
    at com.google.cloud.spark.bigquery.v2.Spark31BigQueryTableProvider.createRelation(Spark31BigQueryTableProvider.java:65)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:473)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:473)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:449)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:569)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
    at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:537)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:549)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:207)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:206)
    at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3849)
    at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3847)
    at org.apache.spark.sql.Dataset.toJavaRDD(Dataset.scala:3859)
    at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:70)
    ... 43 more
Caused by: java.lang.NullPointerException
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.SingletonImmutableList.<init>(SingletonImmutableList.java:41)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.ImmutableList.of(ImmutableList.java:101)
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.getPartitionFields(BigQueryUtil.java:504)
    at com.google.cloud.spark.bigquery.v2.Spark32BigQueryScanBuilder.filterAttributes(Spark32BigQueryScanBuilder.java:44)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.$anonfun$getFilterableTableScan$1(PartitionPruning.scala:82)
    at scala.Option.flatMap(Option.scala:283)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.getFilterableTableScan(PartitionPruning.scala:62)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1(PartitionPruning.scala:253)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1$adapted(PartitionPruning.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:333)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:241)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:219)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:527)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:527)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1227)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1227)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1227)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1227)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.plans.logical.DeserializeToObject.mapChildren(object.scala:79)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:500)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.prune(PartitionPruning.scala:219)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.apply(PartitionPruning.scala:274)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.apply(PartitionPruning.scala:52)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.immutable.ArraySeq.foldLeft(ArraySeq.scala:222)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:333)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:547)
    ... 57 more
davidrabinowitz commented 1 month ago

Are you running this on Dataproc serverless? What is the runtime version?

lorenh516 commented 1 month ago

Yes, running on Dataproc serverless with default runtime version. Looks like it's Spark runtime 2.2 LTS based on gcloud documentation. Update: The failed batch details subpage lists Version: 2.2.25.

Also noting that the error persisted when I tried using the .jar with latest connector version: https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.40.0.jar.

davidrabinowitz commented 1 month ago

If you are running on Dataproc serverless, there's no need to set the connectors in the spark.jar property as they are built into the image. You can use the latest connector by setting --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar on batch creation.

lorenh516 commented 1 month ago

thanks! i've switched to doing that, though it didn't resolve my issue.

davidrabinowitz commented 1 month ago

Can you please share the full stack trace from the 0.41.0 connector?

lorenh516 commented 1 month ago

I've made several changes to my code since last week and have since been able to write the joined table to BQ/ have not been able to reproduce the error. I'm going to close this issue for now and will open a new one if I run into it again.