qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

Continuous trigger fails on Spark 3, but works on Spark 2.4.5 #99

Open pburner opened 3 years ago

pburner commented 3 years ago

Originally posted as a question on Stackoverflow with some more details: (Continuous trigger fails on Spark 3, but works on Spark 2.4.5), below is the summary:

Spark structured streaming from Kinesis as a source was working fine with Spark 2.4.5, after an upgrade to Spark 3.0.0 (also tested with 3.0.1) it started failing with error: Continuous processing does not support StreamingRelation operations.;; kinesis org.apache.spark.sql.AnalysisException: Continuous processing does not support StreamingRelation operations.;; kinesis at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1(UnsupportedOperationChecker.scala:408) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1$adapted(UnsupportedOperationChecker.scala:390) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForContinuous(UnsupportedOperationChecker.scala:390) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:290) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366) at com.niceic.dl.kinesispuller.RunnerErr$.main(RunnerErrorSpark3.scala:50) at com.niceic.dl.kinesispuller.Spark3ErrorTest.$anonfun$new$2(test.scala:32) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190) at org.scalatest.TestSuite.withFixture(TestSuite.scala:196) at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195) at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182) at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236) at com.niceic.dl.kinesispuller.Spark3ErrorTest.org$scalatest$BeforeAndAfterAllConfigMap$$super$run(test.scala:24) at org.scalatest.BeforeAndAfterAllConfigMap.liftedTree1$1(BeforeAndAfterAllConfigMap.scala:248) at org.scalatest.BeforeAndAfterAllConfigMap.run(BeforeAndAfterAllConfigMap.scala:245) at org.scalatest.BeforeAndAfterAllConfigMap.run$(BeforeAndAfterAllConfigMap.scala:242) at com.niceic.dl.kinesispuller.Spark3ErrorTest.run(test.scala:24) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971) at org.scalatest.tools.Runner$.run(Runner.scala:798) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:41) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

Is it a breaking change bug in Spark or was the kinises-sql adapter supposed to be fixed to pick up the change?

itsvikramagr commented 3 years ago

@peterburnash - Spark 3 has significant changes in Data source V2 APIs which is required for continuous streaming. We had to remove the code for continuous streaming to support the connector in Spark 3. https://github.com/qubole/kinesis-sql/pull/92

There is no one working on to add the support in Spark 3. If you interested, please start the PR and I can help you in reviewing and merging it in this repo.