almond-sh / almond

A Scala kernel for Jupyter
https://almond.sh
BSD 3-Clause "New" or "Revised" License
1.59k stars 239 forks source link

Spark? More of a Q than an issue. #12

Closed tylerprete closed 5 years ago

tylerprete commented 9 years ago

I know you were working on jove-spark before, and now this is your focus.

I believe I read somewhere that you have spark working with jupyter-scala. Are you creating the context in your notebooks, or do you have something else in the works for spark interop?

tylerprete commented 9 years ago

I just ran across this: https://github.com/alexarchambault/ammonite-shell/commit/eac3efef2c389cf463f895a40bf9293c56c8123b

Probably enough to answer my Q. Anything else to know about?

alexarchambault commented 9 years ago

I just pushed a few-days-old example notebook, here: https://github.com/alexarchambault/jupyter-scala/blob/topic/spark-example/examples/libraries/Spark.ipynb

I'd like to test it a bit more (doing bigger calculations on a real cluster, not a docker based one), and possibly change a bit the API or add facilities for ec2 or for plotting, before adding that to master.

It should work as is with the current kernels, so it should be easy to test. (Feedback would be welcome :-)

I'll close the issue once the example is in master.

alexarchambault commented 9 years ago

Having Spark work seamlessly from a standard Scala session is a big point of jupyter scala, and is enabled by my customisations of Ammonite (from https://github.com/alexarchambault/ammonite-shell).

tylerprete commented 9 years ago

Awesome. You're doing really cool stuff here! You should definitely know there are people like me out here following this project closely :)

bmabey commented 9 years ago

+1 to what @tylerprete is saying. (Hi Tyler!)

I just gave this a whirl and set it up on our internal JupyterHub and Spark cluster. I was able to create a SparkContext but function serialization and distribution did not work for me. For example, given this code:

sc.parallelize(1 to 10).map(x => x + x).collect()

I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 1131, minvertica1): java.lang.ClassNotFoundException: cmd11$$user$$anonfun$1$$anonfun$apply$1
    at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:274)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)

If you have only been testing this locally (i.e. with one node) then you would not run into this error since the function would not need to be serialized and distributed to the other nodes.

I am using Spark 1.3.1 so this may also be a problem since the artifact that you built was for 1.3.0.

Also, as an aside.. for my kernel I found it helpful to bump the max memory and permgen space since loading the spark driver alone was blowing the defaults. So in my kernel.json I added:

"env" : {"JVM_OPT": "-Xmx3g -XX:MaxPermSize=128m"},
alexarchambault commented 9 years ago

Same remark as here, are you using the right link?

And are you following this example for Spark?

It works for me with a 3 nodes standalone cluster.

The lib underlying the kernel (my fork of Ammonite) is unit tested against standalone clusters, see https://github.com/alexarchambault/ammonite-shell/tree/master/spark/src/test/resources/docker/scripts and https://github.com/alexarchambault/ammonite-shell/blob/scala-2.10.x/project/run-spark-tests (edit: fix link, it's on branch scala-2.10.x, not master). Following the notebook demo above in it works, as does your example (with the demo setup).

And the Spark lib ("com.github.alexarchambault" % "ammonite-spark_1.3_2.10.5" % "0.3.1-SNAPSHOT") depends on spark 1.3.1 btw.

tylerprete commented 9 years ago

Hey @bmabey! Funny that our engineering paths cross once again! :) It truly is a small world.

bmabey commented 9 years ago

@alexarchambault Sorry for the noise. I looked more closely at the output of the creation of my spark context and it turned out that the driver node wasn't resolving correctly. i.e. misconfigured spark cluster on my end.

Once I fixed this issue the notebook example worked perfectly. Thanks for the quick reply. If you ever want me to test some notebooks out on our cluster just ping me.

alexarchambault commented 9 years ago

@bmabey np, thanks for the feedback. I'll keep you posted here when I'll do more devs for the spark support.

rokroskar commented 8 years ago

Hi @alexarchambault, I tried to follow your spark example, but it seems to fail (see below). Looks like there might be some version mismatch? New to scala, so not sure how to debug... any pointers? I compiled jupyter-scala with scala 2.11.

java.lang.reflect.InvocationTargetException
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:497)
    ammonite.interpreter.Interpreter$$anonfun$process$1$$anonfun$apply$21$$anonfun$apply$24$$anonfun$apply$25$$anonfun$11.apply(Interpreter.scala:259)
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
    akka.util.Collections$EmptyImmutableSeq$.<init>(Collections.scala:15)
    akka.util.Collections$EmptyImmutableSeq$.<clinit>(Collections.scala)
    akka.japi.Util$.immutableSeq(JavaAPI.scala:229)
    akka.remote.RemoteSettings.<init>(RemoteSettings.scala:30)
    akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
    sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
    scala.util.Try$.apply(Try.scala:191)
    akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
    akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
    akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
    scala.util.Success.flatMap(Try.scala:230)
    akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
    akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:584)
    akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:577)
    akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
    akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
    org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
    org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
    org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
    org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
    scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
    org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
    org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
    org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)
    org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)
    org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:269)
    org.apache.spark.SparkContext.<init>(SparkContext.scala:272)
    ammonite.spark.Spark$SparkContext.<init>(Spark.scala:228)
    ammonite.spark.Spark.sc(Spark.scala:186)
    ammonite.spark.Spark.start(Spark.scala:197)
    cmd4$$user$$anonfun$1.apply$mcV$sp(Main.scala:56)
    cmd4$$user.<init>(Main.scala:57)
    cmd4.<init>(Main.scala:61)
    cmd4$.<init>(Main.scala:35)
    cmd4$.<clinit>(Main.scala)
    cmd4$Main$.$main(Main.scala:30)
    cmd4$Main.$main(Main.scala)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:497)
    ammonite.interpreter.Interpreter$$anonfun$process$1$$anonfun$apply$21$$anonfun$apply$24$$anonfun$apply$25$$anonfun$11.apply(Interpreter.scala:259)
java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class
    java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    ammonite.interpreter.AddURLClassLoader.findClass(Classes.scala:42)
    java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    akka.util.Collections$EmptyImmutableSeq$.<init>(Collections.scala:15)
    akka.util.Collections$EmptyImmutableSeq$.<clinit>(Collections.scala)
    akka.japi.Util$.immutableSeq(JavaAPI.scala:229)
    akka.remote.RemoteSettings.<init>(RemoteSettings.scala:30)
    akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
    sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
    scala.util.Try$.apply(Try.scala:191)
    akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
    akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
    akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
    scala.util.Success.flatMap(Try.scala:230)
    akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
    akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:584)
    akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:577)
    akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
    akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
    org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
    org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
    org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
    org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
    scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
    org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
    org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
    org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)
    org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)
    org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:269)
    org.apache.spark.SparkContext.<init>(SparkContext.scala:272)
    ammonite.spark.Spark$SparkContext.<init>(Spark.scala:228)
    ammonite.spark.Spark.sc(Spark.scala:186)
    ammonite.spark.Spark.start(Spark.scala:197)
    cmd4$$user$$anonfun$1.apply$mcV$sp(Main.scala:56)
    cmd4$$user.<init>(Main.scala:57)
    cmd4.<init>(Main.scala:61)
    cmd4$.<init>(Main.scala:35)
    cmd4$.<clinit>(Main.scala)
    cmd4$Main$.$main(Main.scala:30)
    cmd4$Main.$main(Main.scala)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:497)
    ammonite.interpreter.Interpreter$$anonfun$process$1$$anonfun$apply$21$$anonfun$apply$24$$anonfun$apply$25$$anonfun$11.apply(Interpreter.scala:259)
alexarchambault commented 5 years ago

The develop branch, soon to be the default, delegates almost all its spark stuff to ammonite-spark. Ask there for help once the README of the develop branch gets updated.