teragrep / pth_10

Data Processing Language (DPL) translator for Apache Spark
GNU Affero General Public License v3.0
0 stars 2 forks source link

Bloom create and update throw error #249

Closed elliVM closed 3 months ago

elliVM commented 4 months ago

Describe the bug Bloom create or update throws error: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Expected behavior Commands should write filters without error and no changes to dataset

How to reproduce Note used:

%spark.conf

dpl.pth_10.bloom.pattern=^(?:[0-9]{1,3}.){3}[0-9]{1,3}$ dpl.pth_06.kafka.enabled=false dpl.pth_06.bloom.enabled=true dpl.pth_06.bloom.withoutFilter=false dpl.pth_06.bloom.db.fields=[{expected: 700, fpp: 0.01},{expected: 5500, fpp: 0.01},{expected: 12500, fpp: 0.01},{expected: 36500, fpp: 0.01},{expected: 115000, fpp: 0.01},{expected: 230000, fpp: 0.01},{expected: 375000, fpp: 0.01},{expected: 610000, fpp: 0.01},{expected: 905000, fpp: 0.01},{expected: 1850000, fpp: 0.01},{expected: 2750000, fpp: 0.01},{expected: 3750000, fpp: 0.01},{expected: 5050000, fpp: 0.01},{expected: 6500000, fpp: 0.01},{expected: 8900000, fpp: 0.01},{expected: 1100000, fpp: 0.01}]

%dpl

index=crud_small earliest=-999d | teragrep exec tokenizer format bytes input '_raw' output 'tokens' | teragrep exec hdfs save bloomByteTokens overwrite=true

%dpl

| teragrep exec hdfs load bloomByteTokens | teragrep exec bloom estimate | teragrep exec hdfs save bloomEstimateTokens overwrite=true

%dpl

| teragrep exec hdfs load bloomByteTokens | join type=inner max=0 partition [| teragrep exec hdfs load bloomEstimateTokens ] | teragrep exec bloom create estimates 'R_estimate(tokens)' input 'tokens'

version QA using pth-10 version 4.18.0-18-g6394139a

Additional context Dump: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; FileSource[bloomByteTokens/data] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:76) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2746) at com.teragrep.pth10.steps.teragrep.TeragrepBloomStep.createBloomFilter(TeragrepBloomStep.java:143) at com.teragrep.pth10.steps.teragrep.TeragrepBloomStep.get(TeragrepBloomStep.java:114) at com.teragrep.pth10.ast.StepList.executeFromStep(StepList.java:184) at com.teragrep.pth10.ast.StepList.execute(StepList.java:155) at com.teragrep.pth10.ast.DPLParserCatalystVisitor.visitRoot(DPLParserCatalystVisitor.java:283) at com.teragrep.pth10.ast.DPLParserCatalystVisitor.visitRoot(DPLParserCatalystVisitor.java:80) at com.teragrep.pth_03.antlr.DPLParser$RootContext.accept(DPLParser.java:3083) at com.teragrep.pth_03.shaded.org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18) at com.teragrep.pth_07.DPLExecutor.interpret(DPLExecutor.java:185) at com.teragrep.pth_07.DPLInterpreter.internalInterpret(DPLInterpreter.java:159) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

elliVM commented 3 months ago

Fixed by adding BloomMode.AGGREGATE check for TeragrepBlooomStep class

if (mode == BloomMode.ESTIMATE || mode == BloomMode.AGGREGATE) {
            // estimate is run as an aggregation
            this.properties.add(CommandProperty.AGGREGATE);
        }