apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.88k stars 2.07k forks source link

Storage Partitioned Join (SPJ) fails when >2 tables are joined #10450

Open mrbrahman opened 1 month ago

mrbrahman commented 1 month ago

Apache Iceberg version

None

Query engine

Spark

Please describe the bug 🐞

SPJ works great when joining 2 tables. For e.g.

// SPJ setup
import org.apache.spark.sql.functions._

val df = spark.range(0,1000000)

val a = df.repartition(10)
    .withColumn("part_1", spark_partition_id)

// create tables with different datasets, but partitioned the same way
a.withColumn("c1", rand()).withColumn("c2", expr("uuid()"))
  .write.partitionBy("part_1").format("iceberg").saveAsTable("ice1")

a.withColumn("c3", rand()).withColumn("c4", expr("uuid()"))
  .write.partitionBy("part_1").format("iceberg").saveAsTable("ice2")

a.withColumn("c5", rand()).withColumn("c6", expr("uuid()"))
  .write.partitionBy("part_1").format("iceberg").saveAsTable("ice3")
-- SPJ in action
-- disable broadcast for testing
set spark.sql.autoBroadcastJoinThreshold = -1;

set spark.sql.sources.v2.bucketing.enabled=true;
set `spark.sql.iceberg.planning.preserve-data-grouping`=true;

-- for our case, need this too
set spark.sql.requireAllClusterKeysForCoPartition=false;

create table ice_joined_1 using iceberg
partitioned by (part_1)
select a.id, c1, c2, c3, c4, a.part_1
from ice1 a
  inner join ice2 b
    on a.id=b.id   -- main join key
    and a.part_1=b.part_1  -- join on all sub-partition fields to enable SPJ

image1

No shuffle! SPJ works!

But as soon as I add a third table to the join:

create table ice_joined_2 using iceberg
partitioned by (part_1)
select a.id, c1, c2, c3, c4, c5, c6, a.part_1
from ice1 a
  inner join ice2 b
    on a.id=b.id   -- main join key
    and a.part_1=b.part_1  -- join on all sub-partition fields to enable SPJ
  inner join ice3 c
    on a.id=c.id   -- main join key
    and a.part_1=c.part_1  -- join on all sub-partition fields to enable SPJ
;

... and BAM

Error happens in sql: 
create table ice_joined_2 using iceberg
partitioned by (part_1)

select a.id, c1, c2, c3, c4, c5, c6, a.part_1
from ice1 a
  inner join ice2 b
    on a.id=b.id   
    and a.part_1=b.part_1  
  inner join ice3 c
    on a.id=c.id   
    and a.part_1=c.part_1  

org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
    at org.apache.spark.SparkException$.internalError(SparkException.scala:88)
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:560)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:572)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
    at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.zeppelin.spark.SparkSqlInterpreter.internalInterpret(SparkSqlInterpreter.java:106)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:647)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:447)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:444)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:426)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.org$apache$spark$sql$execution$exchange$BaseEnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:205)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:679)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:654)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:612)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:612)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:605)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1322)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1321)
    at org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:43)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:605)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:581)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.apply(EnsureRequirements.scala:654)
    at org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.apply(EnsureRequirements.scala:69)
    at org.apache.spark.sql.execution.adaptive.RuleHelper$RuleSeq.$anonfun$applyAll$1(RuleHelper.scala:28)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.adaptive.RuleHelper$RuleSeq.applyAll(RuleHelper.scala:27)
    at org.apache.spark.sql.execution.adaptive.PreprocessingRule$.$anonfun$apply$2(PreprocessingRule.scala:60)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.$anonfun$doExecute$1(InsertAdaptiveSparkPlan.scala:259)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.doExecute(InsertAdaptiveSparkPlan.scala:258)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.execute(InsertAdaptiveSparkPlan.scala:229)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.preprocessAndCreateAdaptivePlan(InsertAdaptiveSparkPlan.scala:175)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:132)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:115)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.$anonfun$applyInternal$1(InsertAdaptiveSparkPlan.scala:125)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:125)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:115)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:495)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:494)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
    ... 58 more
mrbrahman commented 4 weeks ago

Hi @RussellSpitzer I see you have responded on other SPJ tickets. Any ideas on this one?

Thanks