gorillalabs / sparkling

A Clojure library for Apache Spark: fast, fully-features, and developer friendly
https://gorillalabs.github.io/sparkling/
Eclipse Public License 1.0
448 stars 68 forks source link

Running your code guide #3

Open chrisbetz opened 9 years ago

chrisbetz commented 9 years ago

Extend guides section by having a documentation on how to deploy your system

prasincs commented 9 years ago

The mesos portion is fairly straightforward too -- I did that in the process of verifying Sparkling. You just have to change the master url to the mesos one, and rest is well, setting up a Mesos cluster and there are pretty good tutorials elsewhere for that part.

I suggest the following function for the guide. I haven't tried this on anything other than local and mesos, so it could be different for various environments. For mesos, the master mesos:// url needs to be given. and the cluster-type assigned.

(defn make-spark-context [cluster-type & {:keys [master] :or {master nil}}]
  (let [ get-master #(condp = cluster-type
                       :local "local[*]"
                       :mesos master
                       :yarn master
                       :spark-cluster master)
          c (-> (conf/spark-conf)
          (conf/master (get-master))
              (conf/app-name "Test")
              (conf/set "spark.executor.uri" "hdfs://namenode/spark/spark-1.3.0-bin-hadoop2.3.tgz")
             )]
    (spark/spark-context c)))

Update: There's a lot of stuff that needs to be done to keep the classes in sync in Mesos case for a realistic guide.

joefromct commented 9 years ago

is there any example code anywhere that could help me understand what's required to run over a cluster via YARN? The only thing i've come across is a presentation on http://www.slideshare.net/cb.betz/spark-and-clojure-2 slide #56 however there is no samples on how to do this.

What might the (conf/master ) parameters look like?

erasmas commented 9 years ago

@joefromct There's nothing special in Spark config for YARN. Here's what I do.

(def spark-conf
  (-> (conf/spark-conf)
      (conf/master (System/getProperty "spark.master" "local[*]"))
      (conf/app-name "my-spark-app")
      ))

Then you build your uberjar and submit as spark-submit --master yarn my-spark-app-standalone.jar ....

ponimas commented 9 years ago

is there any way to work in repl on yarn?

chrisbetz commented 9 years ago

Hi,

Yes, you can start an nrepl server inside your driver and connect to that.

Cheers,

Chris

Am 08.10.2015 um 10:05 schrieb ponimas notifications@github.com:

is there any way to work in repl on yarn?

— Reply to this email directly or view it on GitHub https://github.com/gorillalabs/sparkling/issues/3#issuecomment-146450571.

stevenmccord commented 8 years ago

@chrisbetz I am working to run this in the REPL and set the sparkContext to be a remote standalone cluster, it appears that I can connect to it:

16/08/23 22:30:29 INFO Master: Registering app sparkling-example
16/08/23 22:30:29 INFO Master: Registered app sparkling-example with ID app-20160823223029-0005

However, when it doesn't appear anything is executing or that it fails when it tries to execute and restarts on the work node....

16/08/23 20:51:16 INFO Master: Launching executor app-20160823204841-0003/115 on worker worker-20160823192913-172.17.0.13-36085
16/08/23 20:51:18 INFO Master: Removing executor app-20160823204841-0003/114 because it is EXITED
16/08/23 20:51:18 INFO Master: Launching executor app-20160823204841-0003/116 on worker worker-20160823192922-172.17.0.14-49496
16/08/23 20:51:18 INFO Master: Removing executor app-20160823204841-0003/115 because it is EXITED
16/08/23 20:51:19 INFO Master: Launching executor app-20160823204841-0003/117 on worker worker-20160823192913-172.17.0.13-36085
16/08/23 20:51:21 INFO Master: Removing executor app-20160823204841-0003/117 because it is EXITED

Wasn't sure if anyone else has tried, this or my workflow could be completely wrong, I am try to develop in the repl against a remote spark cluster. Thanks!

stevenmccord commented 8 years ago

OK, a little bit of an update, and just give you a little more context, since I am doing this as I learn a bit more, so apologies if these are dumb questions.

I am starting a repl session and connecting to the remote spark standalone cluster and it can access the driver so you can see the logs below.

16/08/24 18:51:55 INFO Master: Registering app sparkling-example
16/08/24 18:51:55 INFO Master: Registered app sparkling-example with ID app-20160824185155-0001
16/08/24 18:51:55 INFO Master: Launching executor app-20160824185155-0001/0 on worker worker-20160824123619-172.17.0.17-41680
16/08/24 18:51:55 INFO Master: Launching executor app-20160824185155-0001/1 on worker worker-20160824123619-172.17.0.16-57106
16/08/24 18:51:55 INFO ExecutorRunner: Launch command: "/usr/lib/jvm/java-8-openjdk-amd64/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/lib/spark-assembly-1.5.2-hadoop2.6.0.jar:/opt/spark/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark/lib/datanucleus-core-3.2.10.jar" "-Xms1024M" "-Xmx1024M" "-Dspark.driver.port=53502" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "akka.tcp://sparkDriver@172.17.0.6:53502/user/CoarseGrainedScheduler" "--executor-id" "1" "--hostname" "172.17.0.16" "--cores" "2" "--app-id" "app-20160824185155-0001" "--worker-url" "akka.tcp://sparkWorker@172.17.0.16:57106/user/Worker"

I have gotten past the error in the comment I posted before, so it isn't restarting, but now when I try to do (spark/first data) I am getting getting the error below. Any help would be great. Thanks!

2. Unhandled org.apache.spark.SparkException
   Job aborted due to stage failure: Task 0 in stage 0.0 failed 4
   times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3,
   172.17.0.17): java.lang.IllegalStateException: unread block data at
   java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
   at
   java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
   at
   java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
   at
   java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
   at
   java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
   at
   java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
   at
   org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
   at
   org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
   at
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
   at
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

   Driver stacktrace:

            DAGScheduler.scala: 1283  org.apache.spark.scheduler.DAGScheduler/org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages
            DAGScheduler.scala: 1271  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1/apply
            DAGScheduler.scala: 1270  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1/apply
          ResizableArray.scala:   59  scala.collection.mutable.ResizableArray$class/foreach
             ArrayBuffer.scala:   47  scala.collection.mutable.ArrayBuffer/foreach
            DAGScheduler.scala: 1270  org.apache.spark.scheduler.DAGScheduler/abortStage
            DAGScheduler.scala:  697  org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1/apply
            DAGScheduler.scala:  697  org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1/apply
                  Option.scala:  236  scala.Option/foreach
            DAGScheduler.scala:  697  org.apache.spark.scheduler.DAGScheduler/handleTaskSetFailed
            DAGScheduler.scala: 1496  org.apache.spark.scheduler.DAGSchedulerEventProcessLoop/doOnReceive
            DAGScheduler.scala: 1458  org.apache.spark.scheduler.DAGSchedulerEventProcessLoop/onReceive
            DAGScheduler.scala: 1447  org.apache.spark.scheduler.DAGSchedulerEventProcessLoop/onReceive
               EventLoop.scala:   48  org.apache.spark.util.EventLoop$$anon$1/run
            DAGScheduler.scala:  567  org.apache.spark.scheduler.DAGScheduler/runJob
            SparkContext.scala: 1822  org.apache.spark.SparkContext/runJob
            SparkContext.scala: 1835  org.apache.spark.SparkContext/runJob
            SparkContext.scala: 1848  org.apache.spark.SparkContext/runJob
                     RDD.scala: 1298  org.apache.spark.rdd.RDD$$anonfun$take$1/apply
       RDDOperationScope.scala:  147  org.apache.spark.rdd.RDDOperationScope$/withScope
       RDDOperationScope.scala:  108  org.apache.spark.rdd.RDDOperationScope$/withScope
                     RDD.scala:  306  org.apache.spark.rdd.RDD/withScope
                     RDD.scala: 1272  org.apache.spark.rdd.RDD/take
                     RDD.scala: 1312  org.apache.spark.rdd.RDD$$anonfun$first$1/apply
       RDDOperationScope.scala:  147  org.apache.spark.rdd.RDDOperationScope$/withScope
       RDDOperationScope.scala:  108  org.apache.spark.rdd.RDDOperationScope$/withScope
                     RDD.scala:  306  org.apache.spark.rdd.RDD/withScope
                     RDD.scala: 1311  org.apache.spark.rdd.RDD/first
             JavaRDDLike.scala:  510  org.apache.spark.api.java.JavaRDDLike$class/first
             JavaRDDLike.scala:   47  org.apache.spark.api.java.AbstractJavaRDDLike/first
 NativeMethodAccessorImpl.java:   -2  sun.reflect.NativeMethodAccessorImpl/invoke0
 NativeMethodAccessorImpl.java:   62  sun.reflect.NativeMethodAccessorImpl/invoke
DelegatingMethodAccessorImpl.java:   43  sun.reflect.DelegatingMethodAccessorImpl/invoke
                   Method.java:  498  java.lang.reflect.Method/invoke
                Reflector.java:   93  clojure.lang.Reflector/invokeMatchingMethod
                Reflector.java:   28  clojure.lang.Reflector/invokeInstanceMethod
                      core.clj:  541  sparkling.core/first
                      core.clj:  541  sparkling.core/first
                          REPL:   14  tf-idf.core/eval9903
                          REPL:   14  tf-idf.core/eval9903
                 Compiler.java: 6927  clojure.lang.Compiler/eval
                 Compiler.java: 6890  clojure.lang.Compiler/eval
                      core.clj: 3105  clojure.core/eval
                      core.clj: 3101  clojure.core/eval
                      main.clj:  240  clojure.main/repl/read-eval-print/fn
                      main.clj:  240  clojure.main/repl/read-eval-print
                      main.clj:  258  clojure.main/repl/fn
                      main.clj:  258  clojure.main/repl
                      main.clj:  174  clojure.main/repl
                   RestFn.java: 1523  clojure.lang.RestFn/invoke
        interruptible_eval.clj:   87  clojure.tools.nrepl.middleware.interruptible-eval/evaluate/fn
                      AFn.java:  152  clojure.lang.AFn/applyToHelper
                      AFn.java:  144  clojure.lang.AFn/applyTo
                      core.clj:  646  clojure.core/apply
                      core.clj: 1881  clojure.core/with-bindings*
                      core.clj: 1881  clojure.core/with-bindings*
                   RestFn.java:  425  clojure.lang.RestFn/invoke
        interruptible_eval.clj:   85  clojure.tools.nrepl.middleware.interruptible-eval/evaluate
        interruptible_eval.clj:   55  clojure.tools.nrepl.middleware.interruptible-eval/evaluate
        interruptible_eval.clj:  222  clojure.tools.nrepl.middleware.interruptible-eval/interruptible-eval/fn/fn
        interruptible_eval.clj:  190  clojure.tools.nrepl.middleware.interruptible-eval/run-next/fn
                      AFn.java:   22  clojure.lang.AFn/run
       ThreadPoolExecutor.java: 1142  java.util.concurrent.ThreadPoolExecutor/runWorker
       ThreadPoolExecutor.java:  617  java.util.concurrent.ThreadPoolExecutor$Worker/run
                   Thread.java:  745  java.lang.Thread/run

My goal is to have a repl-based development environment, but I would be attaching to spark cluster outside of the JVM. It appears that I have the connections right, and the cluster is able to communicate to the spark driver in the REPL, but it seems when I try to evaluate the lazy RDDs it fails on me. I am trying to avoid having to compile a jar, submit it to the local spark cluster, etc. If I could do it in the REPL environment my workflow would be much faster. I definitely am using the local context in the REPL and that is fine, but I also desire to connect to a remote spark cluster as well. Anyway, really appreciate any help you can provide!