aalkilani / spark-kafka-cassandra-applying-lambda-architecture

Other
64 stars 52 forks source link

Kafka receiver demo #8

Closed VictorPelaez closed 7 years ago

VictorPelaez commented 7 years ago

Hi,

When I am running spark streaming kafka receiver demo in Zeppelin, I get an error due to "Logging". I have read that spark 1.6.2 or 1.6.3 don´t support it. (http://stackoverflow.com/questions/38893655/spark-twitter-streaming-exception-org-apache-spark-logging-classnotfound)

java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:81) ... 46 elided Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 59 more

I have imported and inserted in the interpreter the dependences for kafka. Any ideas?

Thanks

aalkilani commented 7 years ago

Are you seeing any other problems with the demo or simply that this is coming up? We're not doing anything with Spark logging in this course so something like this can safely be ignored. If you see any other problems then it's likely due to something else. If you're still having issues, can you please detail exactly which Module and clip you're having issues with so I can replicate.

Thanks

VictorPelaez commented 7 years ago

Hi, I found this problem with Zeppelin. I posted in the Discussion board an other issue about the YARN demo, but I don´t think they are relationed. Already with box v.0.0.6 (Spark 1.6.3 and Kafka 0.9.0.0), in Spark intepreter I added (instead of yours):

Then, LogProducer runs perfectly and starts to send messages. Just execute your Zeppelin notebook for this demo (all the imports and ssc creation works for me) and it gives me an error in :

_val kstream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Map("weblogs-text" -> 1), StorageLevel.MEMORY_ANDDISK).map(._2)__

Error trace is the same published in last comment. Also, I tried to change all the versions in pom dependences, but still fails.

kafka_zeppelin

aalkilani commented 7 years ago

Thanks for the feedback. I can't say for certain why this changed and I've tried to trace back release notes for Zeppelin and for my Docker builds but couldn't find the correlation. In any case, what seems to have happened is that Zeppelin 0.6.1 has switched to using Spark 2.0. There's a distinction to understand in the course's VM.

There's a Spark version that is running on Yarn, and locally if you just run a spark-submit. This is the version that is under /pluralsight/spark. This is currently set as 1.6.3.

There is also a Spark version that comes pre-built with Zeppelin. Ideally these would match as not to cause any confusion and that certainly was the intent. The gist of it is that Zeppelin 0.6.1 now ships with Spark 2.0 and that version of Spark has carved out the dependencies for Kafka a little differently.

Give this a shot and let me know if it works (tested locally and all looks good) and I'll announce an update to this:

org.apache.kafka:kafka_2.11:0.8.2.1 
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 
VictorPelaez commented 7 years ago

Thanks for the answer, It works for me!! Just changing the dependences in the Zeppelin Spark interpreter. I don´t understand why everything works changing the spark version to 2.0 and we haven't got this version in the VM... In addition, in the VM we have another version of kafka, so how can it work?

Thanks again,

aalkilani commented 7 years ago

@VictorPelaez , Zeppelin is a little special in that it comes pre-built with its own version of Spark that ends up running locally by default as you can tell from the Spark interpreter settings.

Any build of Zeppelin will come pre-packaged with a specific version of Spark and using a specific version of Scala. So Zeppelin's default behavior is to use the pre-built version of Spark it comes packaged with. As Zeppelin 0.6.1 was still in pre-release, they were still using Spark 1.6.x.. once Zeppelin released 0.6.1 they officially switched over to Spark 2.0. Zeppelin by default uses its pre-built Spark binaries and runs in local mode. That's why we're trying to match what Zeppelin has even though it's not apparent that the VM has that version of Spark because the VM in reality doesn't provide that version, it's Zeppelin's own build that has that version baked in.

Now, you can instruct Zeppelin to actually use a different version of Spark by telling it to run on Yarn and use the version that we are using in the VM. That would work and will also give you the added advantage of easily looking at the Spark UI even when running jobs from Zeppelin. The disadvantage is that running Spark on Yarn in a VM with very limited resources is a very delicate dance of managing memory and other resources so you will likely spend more time troubleshooting memory allocation problems because all of the resources don't have enough space to breathe.

If you wish to run Zeppelin against the same version of Spark on the VM and run against Yarn (not local mode)

  1. Your host where you're running the VM needs to have enough resources, you can increase the VM's memory by editing the Vagrantfile Increase the memory on this line v.memory = 4096 For example to 8GB v.memory = 8192

  2. Then run vagrant reload

  3. Now your VM is running with 8GB of ram. The next step is to increase the memory Spark can use when running on Yarn. To do that, edit the file under /pluralsight/spark/config/spark-defaults Here are the currently used settings. You will want to increase these (double or triple them):

    spark.driver.memory 1G
    spark.executor.memory 512M
    spark.yarn.executor.memoryOverhead 1024
    spark.executor.instances 1
  4. Finally , modify Zeppelin to use the VM's version of Spark and run on Yarn. From the Zeppelin UI, go to Interpreters and under Spark's interpreter adjust/add the following

Again, none of this is necessary but if you wish to try to run with the same version of Spark and use Yarn as well, these are the steps required.

Glad everything is working out for you now.

have the option to note use Zeppelin's pre-built version for example in favor of running

VictorPelaez commented 7 years ago

Thanks for your explaination, and congrats for the course and your effort!