aalkilani / spark-kafka-cassandra-applying-lambda-architecture

Other
64 stars 52 forks source link

Error when running Streaming SQL in Zeppelin #13

Closed JonnyDaenen closed 7 years ago

JonnyDaenen commented 7 years ago

This is related to the video "Streaming Aggregations with Zeppelin: Demo"

When i run the following code in Apache Zeppelin

%sql
select from_unixtime(timestamp_hour/1000, "MM-dd HH:mm:00") as timestamphour, purchase_count, add_to_cart_count, page_view_count
from activityByProduct

I receive the following error, or nothing.

java.lang.NoSuchMethodException: org.apache.spark.io.LZ4CompressionCodec.<init>(org.apache.spark.SparkConf)
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.getConstructor(Class.java:1825)
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:72)
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:66)
    at org.apache.spark.sql.execution.SparkPlan.org$apache$spark$sql$execution$SparkPlan$$decodeUnsafeRows(SparkPlan.scala:265)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$1.apply(SparkPlan.scala:351)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$1.apply(SparkPlan.scala:350)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:350)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
    at sun.reflect.GeneratedMethodAccessor114.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.zeppelin.spark.ZeppelinContext.showDF(ZeppelinContext.java:214)
    at org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:129)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Any hints on fixing it?

Thanks!

JonnyDaenen commented 7 years ago

I managed to get it to work by setting the compression codec to Snappy:

sc.getConf.set("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
@transient val ssc = new StreamingContext(sc, Seconds(4))
ssc.checkpoint("hdfs://lambda-pluralsight:9000/spark/checkpoint")

Note that I still get the original error occasionally, even with these settings...

aalkilani commented 7 years ago

Thanks @JonnyDaenen , there seems to be a Zeppelin bug logged on this and a thread that attempts to explain some of the behavior. Given this isn't an issue with the course code itself I am closing this ticket with the comments you've provided and the links below: https://issues.apache.org/jira/browse/ZEPPELIN-1799 https://groups.google.com/forum/#!topic/spark-notebook-user/aGRBESad8Ew