sorenmacbeth / flambo

A Clojure DSL for Apache Spark
Eclipse Public License 1.0
606 stars 84 forks source link

Getting error while using map and filter functions #135

Closed mayankpahwa closed 6 years ago

mayankpahwa commented 6 years ago

Hi, I have just started using flambo and was testing out some basic stuff. I have written a code that reads a file and applies some transformations on it. Something like:

(-> rdd
    (f/filter (f/fn [e] (not= header-txt e)))
    (f/map (f/fn [e] (mapv #(s/replace (s/trim %) rreg "")
                           (s/split e regex-inuse))))
    (f/map (f/fn [e] (zipmap header e)))
    (f/collect))

When I tested this code in a local Spark environment, everything ran fine. However, I tried to run the above code with spark running in a cluster setup and I got the following error.

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 45, 172.21.0.3, executor 0): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

     Driver stacktrace:

I tried to debug the error and it looks like the map,filter functions might be causing the error. I am able to do a collect or take on the rdd object but once the map and filter functions are performed, the above error is thrown.

Can you please point out what the error could be and how to solve it. I am using Spark v2.1.1

Thanks, Mayank

mayankpahwa commented 6 years ago

I referred to this answer and resolved the issue by adding my application's jar to spark's classpath. Hence closing this.