sorenmacbeth / flambo

A Clojure DSL for Apache Spark
Eclipse Public License 1.0
606 stars 84 forks source link

spark 2.0 text-file error #95

Closed clojurians-org closed 8 years ago

clojurians-org commented 8 years ago

i encounter the following issue when run this code: it will run successfully on scala. someone can help on it? i'll investigate it also.

(defproject etl-helper "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :profiles {:provided {:dependencies [[org.apache.spark/spark-core_2.11 "2.0.0"]]}
             :dev {:aot :all}} 
  :aot :all
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [yieldbot/flambo "0.7.1"]
                 [incanter "1.5.7"]])

(ns etl-helper.core                                                                                                                                            
  (:require [flambo.conf :as conf]                                                                                                                             
            [flambo.api :as f] ))                                                                                                                              

(defonce sc (-> (conf/spark-conf) (conf/master "local") (conf/app-name "ods") f/spark-context))                                                                

#_(def data (f/text-file sc "/home/larluo/data/data.csv.gz"))                                                                                                  
#_(def data (f/text-file sc "hdfs://192.168.1.3:9000/user/hive/warehouse/stg.db/sample/data/*.gz"))                                                            

(.textFile sc "/home/larluo/data/data.csv.gz")                                                                                                                 
scala> sc.textFile("/home/larluo/data/data.csv.gz") ;
res1: org.apache.spark.rdd.RDD[String] = /home/larluo/data/data.csv.gz MapPartitionsRDD[1] at textFile at <console>:25

log file:

larluo@client:~/work/git/etl-helper$ lein compile
Compiling etl-helper.core
log4j:WARN No appenders could be found for logger (flambo.api).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/07/28 14:27:18 INFO SparkContext: Running Spark version 2.0.0
16/07/28 14:27:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/07/28 14:27:18 INFO SecurityManager: Changing view acls to: larluo
16/07/28 14:27:18 INFO SecurityManager: Changing modify acls to: larluo
16/07/28 14:27:18 INFO SecurityManager: Changing view acls groups to: 
16/07/28 14:27:18 INFO SecurityManager: Changing modify acls groups to: 
16/07/28 14:27:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(larluo); groups with view permissions: Set(); users  with modify permissions: Set(larluo); groups with modify permissions: Set()
16/07/28 14:27:18 INFO Utils: Successfully started service 'sparkDriver' on port 33100.
16/07/28 14:27:18 INFO SparkEnv: Registering MapOutputTracker
16/07/28 14:27:18 INFO SparkEnv: Registering BlockManagerMaster
16/07/28 14:27:18 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-df81d695-37f4-440c-9d85-4a1470836585
16/07/28 14:27:18 INFO MemoryStore: MemoryStore started with capacity 1938.6 MB
16/07/28 14:27:18 INFO SparkEnv: Registering OutputCommitCoordinator
16/07/28 14:27:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
16/07/28 14:27:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
16/07/28 14:27:18 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
16/07/28 14:27:18 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
16/07/28 14:27:18 INFO Utils: Successfully started service 'SparkUI' on port 4044.
16/07/28 14:27:18 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.2:4044
16/07/28 14:27:18 INFO Executor: Starting executor ID driver on host localhost
16/07/28 14:27:18 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34156.
16/07/28 14:27:18 INFO NettyBlockTransferService: Server created on 192.168.1.2:34156
16/07/28 14:27:18 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.2, 34156)
16/07/28 14:27:18 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.2:34156 with 1938.6 MB RAM, BlockManagerId(driver, 192.168.1.2, 34156)
16/07/28 14:27:18 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.2, 34156)
16/07/28 14:27:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 1938.5 MB)
java.lang.NoSuchMethodError: scala.collection.immutable.ListSet$.empty()Lscala/collection/immutable/ListSet;, compiling:(core.clj:10:1)
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.ListSet$.empty()Lscala/collection/immutable/ListSet;, compiling:(core.clj:10:1)
    at clojure.lang.Compiler$InstanceMethodExpr.eval(Compiler.java:1532)
    at clojure.lang.Compiler.compile1(Compiler.java:7474)
    at clojure.lang.Compiler.compile(Compiler.java:7541)
    at clojure.lang.RT.compile(RT.java:406)
    at clojure.lang.RT.load(RT.java:451)
    at clojure.lang.RT.load(RT.java:419)
    at clojure.core$load$fn__5677.invoke(core.clj:5893)
    at clojure.core$load.invokeStatic(core.clj:5892)
    at clojure.core$load.doInvoke(core.clj:5876)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at clojure.core$load_one.invokeStatic(core.clj:5697)
    at clojure.core$compile$fn__5682.invoke(core.clj:5903)
    at clojure.core$compile.invokeStatic(core.clj:5903)
    at clojure.core$compile.invoke(core.clj:5895)
    at user$eval20$fn__29.invoke(form-init1896567387473311988.clj:1)
    at user$eval20.invokeStatic(form-init1896567387473311988.clj:1)
    at user$eval20.invoke(form-init1896567387473311988.clj:1)
    at clojure.lang.Compiler.eval(Compiler.java:6927)
    at clojure.lang.Compiler.eval(Compiler.java:6917)
    at clojure.lang.Compiler.eval(Compiler.java:6917)
    at clojure.lang.Compiler.load(Compiler.java:7379)
    at clojure.lang.Compiler.loadFile(Compiler.java:7317)
    at clojure.main$load_script.invokeStatic(main.clj:275)
    at clojure.main$init_opt.invokeStatic(main.clj:277)
    at clojure.main$init_opt.invoke(main.clj:277)
    at clojure.main$initialize.invokeStatic(main.clj:308)
    at clojure.main$null_opt.invokeStatic(main.clj:342)
    at clojure.main$null_opt.invoke(main.clj:339)
    at clojure.main$main.invokeStatic(main.clj:421)
    at clojure.main$main.doInvoke(main.clj:384)
    at clojure.lang.RestFn.invoke(RestFn.java:421)
    at clojure.lang.Var.invoke(Var.java:383)
    at clojure.lang.AFn.applyToHelper(AFn.java:156)
    at clojure.lang.Var.applyTo(Var.java:700)
    at clojure.main.main(main.java:37)
Caused by: java.lang.NoSuchMethodError: scala.collection.immutable.ListSet$.empty()Lscala/collection/immutable/ListSet;
    at com.twitter.chill.ScalaCollectionsRegistrar.apply(ScalaKryoInstantiator.scala:130)
    at com.twitter.chill.AllScalaRegistrar.apply(ScalaKryoInstantiator.scala:176)
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:136)
    at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:274)
    at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:259)
    at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:175)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:233)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:86)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1370)
    at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:984)
    at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:981)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:682)
    at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:981)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:802)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:800)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:682)
    at org.apache.spark.SparkContext.textFile(SparkContext.scala:800)
    at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:172)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
    at clojure.lang.Compiler$InstanceMethodExpr.eval(Compiler.java:1527)
    ... 34 more
16/07/28 14:27:19 INFO SparkContext: Invoking stop() from shutdown hook
16/07/28 14:27:19 INFO SparkUI: Stopped Spark web UI at http://192.168.1.2:4044
16/07/28 14:27:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/07/28 14:27:19 INFO MemoryStore: MemoryStore cleared
16/07/28 14:27:19 INFO BlockManager: BlockManager stopped
16/07/28 14:27:19 INFO BlockManagerMaster: BlockManagerMaster stopped
16/07/28 14:27:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/07/28 14:27:19 INFO SparkContext: Successfully stopped SparkContext
16/07/28 14:27:19 INFO ShutdownHookManager: Shutdown hook called
16/07/28 14:27:19 INFO ShutdownHookManager: Deleting directory /tmp/spark-de004f26-23e4-4138-a1fc-ef4e93be5a0e
Compilation failed: Subprocess failed
sorenmacbeth commented 8 years ago

flambo hasn't yet been tested against spark 2.0 so there is currently no expectation that anything will work without modifications.

clojurians-org commented 8 years ago

i switch to spark 1.6 version in profiles, it still have the same error. it seems related to scala configuration. but i think spark jars directory contains it already.

sorenmacbeth commented 8 years ago

it looks like you're trying to use scala 2.11 libraries and flambo is compiled against 2.10. I don't know enough about scala to know for sure if that's causing your issues or not but I suspect that's it. Last night I published flambo 0.8.0-SNAPSHOT which is compiled against scala 2.11 and compatable with spark 2.0.0 (at least the tests pass). You might try that.

clojurians-org commented 8 years ago

it works. i love your work:-)

sorenmacbeth commented 8 years ago

cheers!