GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
378 stars 198 forks source link

NullPointerException when using Java connector with time partitioned tables #1254

Closed rafalh closed 3 months ago

rafalh commented 4 months ago

When I try to run the following code in PySpark in DataProc 2.2 I get NullPointerException from the connector:

dt = '2024-07-07'
df1 = (
    spark.read
    .format("bigquery")
    .option("table", 'table1')
    .option("filter", f"_PARTITIONDATE = '{dt}'")
    .load()
    .select("page_view.page_view_id")
)
df2 = (
    spark.read
    .format("bigquery")
    .option("table", 'table2')
    .option("filter", f"_PARTITIONDATE = '{dt}'")
    .load()
    .select("page_view_id")
)
df1.join(df2, "page_view_id").cache()

I tried different versions of the connector (setting SPARK_BQ_CONNECTOR_URL metadata for DataProc cluster):

It seems only Java connector for Spark 3.2+ is affected. It makes sense because Spark32BigQueryScanBuilder can be seen in the stacktrace. From what I understand the exception is caused by StandardTableDefinition.getTimePartitioning() returning null which makes sense because my tables has no explicit partitioning column (_PARTITIONTIME pseudo-column is used).

Stacktrace:

Py4JJavaError: An error occurred while calling o104.cache.
: org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
    at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:537)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:549)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:333)
    at org.apache.spark.sql.execution.CacheManager.$anonfun$cacheQuery$2(CacheManager.scala:125)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:120)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:93)
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:3785)
    at org.apache.spark.sql.Dataset.cache(Dataset.scala:3795)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:906)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.SingletonImmutableList.<init>(SingletonImmutableList.java:41)
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.collect.ImmutableList.of(ImmutableList.java:101)
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.getPartitionFields(BigQueryUtil.java:504)
    at com.google.cloud.spark.bigquery.v2.Spark32BigQueryScanBuilder.filterAttributes(Spark32BigQueryScanBuilder.java:44)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.$anonfun$getFilterableTableScan$1(PartitionPruning.scala:82)
    at scala.Option.flatMap(Option.scala:271)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.getFilterableTableScan(PartitionPruning.scala:62)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1(PartitionPruning.scala:253)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1$adapted(PartitionPruning.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:241)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:219)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:527)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:527)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1227)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1226)
    at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:524)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:500)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.prune(PartitionPruning.scala:219)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.apply(PartitionPruning.scala:274)
    at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.apply(PartitionPruning.scala:52)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:547)
    ... 24 more
vishalkarve15 commented 3 months ago

Merged the fix. It will be available in the next release.

rafalh commented 3 months ago

@vishalkarve15 I think you forgot to add info to CHANGES.md. When can I expect a new release?

vishalkarve15 commented 3 months ago

Adding it here: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1271