microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
424 stars 115 forks source link

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.Join.copy #366

Closed AFFogarty closed 3 years ago

AFFogarty commented 3 years ago

Describe the issue

Calling df.show() after a join throws the error:

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.Join.copy(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;Lorg/apache/spark/sql/catalyst/plans/JoinType;Lscala/Option;)Lorg/apache/spark/sql/catalyst/plans/logical/Join;

To Reproduce

In Azure Synapse Analytics, run Hitchhiker's Guide to Hyperspace (Scala) notebook.

Run the cells in order until you reach the cell which looks like this:

val eqJoin: DataFrame =
      empDFrame.
      join(deptDFrame, empDFrame("deptId") === deptDFrame("deptId")).
      select(empDFrame("empName"), deptDFrame("deptName"))

eqJoin.show()

hyperspace.explain(eqJoin)(displayHTML(_))

Running this cell produces an error with the following stacktrace:

Error: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.Join.copy(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;Lorg/apache/spark/sql/catalyst/plans/JoinType;Lscala/Option;)Lorg/apache/spark/sql/catalyst/plans/logical/Join;
  at com.microsoft.hyperspace.index.rules.JoinIndexRule$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(JoinIndexRule.scala:65)
  at com.microsoft.hyperspace.index.rules.JoinIndexRule$$anonfun$apply$1$$anonfun$applyOrElse$1.apply(JoinIndexRule.scala:61)
  at scala.Option.map(Option.scala:146)
  at com.microsoft.hyperspace.index.rules.JoinIndexRule$$anonfun$apply$1.applyOrElse(JoinIndexRule.scala:61)
  at com.microsoft.hyperspace.index.rules.JoinIndexRule$$anonfun$apply$1.applyOrElse(JoinIndexRule.scala:57)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
...

Expected behavior

Should show the dataframe without error.

Environment

Tested in Azure Synapse Analytics.

thugsatbay commented 3 years ago
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.hyperspace.explain.displayMode", "html")
val departments = Seq(
  (10, "Accounting", "New York"),
  (20, "Research", "Dallas"),
  (30, "Sales", "Chicago"),
  (40, "Operations", "Boston"))
// Sample employee records
val employees = Seq(
  (7369, "SMITH", 20),
  (7499, "ALLEN", 30),
  (7521, "WARD", 30),
  (7566, "JONES", 20),
  (7698, "BLAKE", 30),
  (7782, "CLARK", 10),
  (7788, "SCOTT", 20),
  (7839, "KING", 10),
  (7844, "TURNER", 30),
  (7876, "ADAMS", 20),
  (7900, "JAMES", 30),
  (7934, "MILLER", 10),
  (7902, "FORD", 20),
  (7654, "MARTIN", 30))
// Save sample data in the Parquet format
import spark.implicits._
val empDFM: DataFrame = employees.toDF("empId", "empName", "deptId")
val deptDFM: DataFrame = departments.toDF("deptId", "deptName", "location")
val dataPath = testDir
val empLocation: String = s"$dataPath/employees.parquet"
val deptLocation: String = s"$dataPath/departments.parquet"
empDFM.write.mode("overwrite").parquet(empLocation)
deptDFM.write.mode("overwrite").parquet(deptLocation)
val empDF: DataFrame = spark.read.parquet(empLocation)
val deptDF: DataFrame = spark.read.parquet(deptLocation)
val empIndexConfig: IndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName"))
val deptIndexConfig1: IndexConfig = IndexConfig("deptIndex1", Seq("deptId"), Seq("deptName"))
val deptIndexConfig2: IndexConfig = IndexConfig("deptIndex2", Seq("location"), Seq("deptName"))
hyperspace.createIndex(empDF, empIndexConfig)
hyperspace.createIndex(deptDF, deptIndexConfig1)
hyperspace.createIndex(deptDF, deptIndexConfig2)

val empDFrame = empDF
val deptDFrame = deptDF
val eqJoin: DataFrame =
  empDFrame.
    join(deptDFrame, empDFrame("deptId") === deptDFrame("deptId")).
    select(empDFrame("empName"), deptDFrame("deptName"))
eqJoin.show()
hyperspace.explain(eqJoin)

Using this script locally on dev machine it works with OSS spark just fine.