microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
424 stars 115 forks source link

Unable to use hyperspace on databricks runtime 8.4 (spark 3.1.2 scale 2.12) #520

Open richiesgr opened 2 years ago

richiesgr commented 2 years ago

Describe the issue

Trying to use hyperspace API produce a runtime exception. java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan; at com.microsoft.hyperspace.index.covering.CoveringIndexRuleUtils$.transformPlanToUseIndexOnlyScan(CoveringIndexRuleUtils.scala:110) at com.microsoft.hyperspace.index.covering.CoveringIndexRuleUtils$.transformPlanToUseIndex(CoveringIndexRuleUtils.scala:79) at com.microsoft.hyperspace.index.covering.FilterIndexRule$.applyIndex(FilterIndexRule.scala:148) at com.microsoft.hyperspace.index.rules.HyperspaceRule.apply(HyperspaceRule.scala:75) at com.microsoft.hyperspace.index.rules.HyperspaceRule.apply$(HyperspaceRule.scala:62) at com.microsoft.hyperspace.index.covering.FilterIndexRule$.apply(FilterIndexRule.scala:129) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$3(ScoreBasedIndexPlanOptimizer.scala:56) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$3$adapted(ScoreBasedIndexPlanOptimizer.scala:55) at scala.collection.immutable.List.foreach(List.scala:392) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.recApply(ScoreBasedIndexPlanOptimizer.scala:55) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$2(ScoreBasedIndexPlanOptimizer.scala:47) at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:197) at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:196) at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224) at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.recChildren$1(ScoreBasedIndexPlanOptimizer.scala:46) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$3(ScoreBasedIndexPlanOptimizer.scala:59) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$3$adapted(ScoreBasedIndexPlanOptimizer.scala:55) at scala.collection.immutable.List.foreach(List.scala:392) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.recApply(ScoreBasedIndexPlanOptimizer.scala:55) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$2(ScoreBasedIndexPlanOptimizer.scala:47) at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:197) at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:196) at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224) at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:224) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.recChildren$1(ScoreBasedIndexPlanOptimizer.scala:46) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$3(ScoreBasedIndexPlanOptimizer.scala:59) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.$anonfun$recApply$3$adapted(ScoreBasedIndexPlanOptimizer.scala:55) at scala.collection.immutable.List.foreach(List.scala:392) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.recApply(ScoreBasedIndexPlanOptimizer.scala:55) at com.microsoft.hyperspace.index.rules.ScoreBasedIndexPlanOptimizer.apply(ScoreBasedIndexPlanOptimizer.scala:79) at com.microsoft.hyperspace.index.rules.ApplyHyperspace$.apply(ApplyHyperspace.scala:59) at com.microsoft.hyperspace.index.rules.ApplyHyperspace$.apply(ApplyHyperspace.scala:32) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:221) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:221) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:89) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:218) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:210) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:210) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:188) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:109) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:188) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:112) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:180) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:180) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:109) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:109) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:120) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:139) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:136) at com.databricks.sql.transaction.tahoe.metering.DeltaMetering$.reportUsage(ScanReport.scala:147) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:185) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3813) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3021) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:263) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:97) at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:415) at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:799) at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:379) at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:857) at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:735) at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:814) at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:650) at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:777) at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:219) at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:735) at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:206) at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:542) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:240) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:235) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:232) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:51) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:279) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:271) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:51) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:519) at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689) at scala.util.Try$.apply(Try.scala:213) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681) at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221) at java.lang.Thread.run(Thread.java:748)

To Reproduce

Steps to reproduce the behavior:

  1. Build from source target spark 3.1.1 and install jar on databricks cluster
  2. Read parquet data df = spark.read.parquet(<location>)
  3. create hyperspace index from hyperspace import IndexConfig hs.createIndex(df, IndexConfig("index", ["indexField"], ["FieldToInclude"]))
  4. Enable Hyperspace Hyperspace.enable(spark)
  5. Query the data query1 = df.filter("""indexField = '12daec2882aa4662b2d8a050950da9ae'""").select('FieldToInclude')

Expected behavior

Get the data back :-)

Environment

Please complete the following information if applicable:

richiesgr commented 2 years ago

Still same problem

sezruby commented 2 years ago

Hi @richiesgr, we cannot support Databricks runtime completely as they use their custom Spark version. As Hyperspace implementation is tightly coupled with the internal spark codebase, we cannot fix the issue unless we know the internal codebase.

StixCoutinho commented 2 years ago

Hi @sezruby ! Would databricks interested in using Hyperspace in the azure databricks environment? How would it be possible for me to get information about this with databricks?