sorenmacbeth / flambo

A Clojure DSL for Apache Spark
Eclipse Public License 1.0
606 stars 84 forks source link

flatMap Exception: clojure.lang.PersistentVector cannot be cast to java.util.Iterator #96

Closed clojurians-org closed 8 years ago

clojurians-org commented 8 years ago

i'll look into it. the map operation is normal, but the flat-map raise some conversion exception.

(defproject etl-helper "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :profiles {:provided {:dependencies [[org.apache.spark/spark-core_2.11 "2.0.0"]]}} 
  :aot :all
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [com.rpl/specter "0.11.2"]
                 [org.clojure/data.csv "0.1.3"]
                 [org.scala-lang/scala-library "2.11.8"]
                 [yieldbot/flambo "0.8.0-SNAPSHOT"]
                 [incanter "1.5.7"]])                                                                                                                            

(ns etl-helper.core                                                                                                                                                    
  (:require [flambo.conf :as conf]                                                                                                                                     
            [flambo.api :as f]                                                                                                                                         
            [flambo.tuple :as ft]                                                                                                                                      
            [com.rpl.specter :refer [ALL FIRST LAST submap]]                                                                                                           
            [com.rpl.specter.macros :refer [select]]                                                                                                                   
            [incanter.core :refer [sel to-matrix to-list]]                                                                                                             
            [incanter.io :refer [read-dataset]]                                                                                                                        
            [clojure.data.csv :as csv :refer [read-csv write-csv]]                                                                                                     
            [clojure.java.io :as io]))                                                                                                                                 

(defonce sc (-> (conf/spark-conf) (conf/master "local[*]") (conf/app-name "ods") f/spark-context))

(-> (f/parallelize sc ["this is line 1" "also line 2"])                                                                                                                
    (f/flat-map (f/fn [l] (clojure.string/split l #" ")))                                                                                                              
    f/collect)  

log info:


2. Unhandled org.apache.spark.SparkException                                       

1. Caused by java.lang.ClassCastException                                          
   clojure.lang.PersistentVector cannot be cast to java.util.Iterator              

                       nil:   -1  flambo.function.FlatMapFunction/call             
         JavaRDDLike.scala:  124  org.apache.spark.api.java.JavaRDDLike$$anonfun$f$
            Iterator.scala:  434  scala.collection.Iterator$$anon$12/nextCur       
            Iterator.scala:  440  scala.collection.Iterator$$anon$12/hasNext       
            Iterator.scala:  893  scala.collection.Iterator$class/foreach          
            Iterator.scala: 1336  scala.collection.AbstractIterator/foreach        
            Growable.scala:   59  scala.collection.generic.Growable$class/$plus$pl$
         ArrayBuffer.scala:  104  scala.collection.mutable.ArrayBuffer/$plus$plus$$
     TraversableOnce.scala:  310  scala.collection.TraversableOnce$class/to        
            Iterator.scala: 1336  scala.collection.AbstractIterator/to             
     TraversableOnce.scala:  302  scala.collection.TraversableOnce$class/toBuffer  
            Iterator.scala: 1336  scala.collection.AbstractIterator/toBuffer       
     TraversableOnce.scala:  289  scala.collection.TraversableOnce$class/toArray   
            Iterator.scala: 1336  scala.collection.AbstractIterator/toArray        
                 RDD.scala:  893  org.apache.spark.rdd.RDD$$anonfun$collect$1$$ano$
        SparkContext.scala: 1897  org.apache.spark.SparkContext$$anonfun$runJob$5/$
          ResultTask.scala:   70  org.apache.spark.scheduler.ResultTask/runTask    
                Task.scala:   85  org.apache.spark.scheduler.Task/run              
            Executor.scala:  274  org.apache.spark.executor.Executor$TaskRunner/ru$
   ThreadPoolExecutor.java: 1142  java.util.concurrent.ThreadPoolExecutor/runWorke$
   ThreadPoolExecutor.java:  617  java.util.concurrent.ThreadPoolExecutor$Worker/r$
               Thread.java:  745  java.lang.Thread/run       
sorenmacbeth commented 8 years ago

the java api changed in 2.0.0 so flat-map functions now have to return a java.util.Iterator. Check out the tests for example of what this might look like.

sorenmacbeth commented 8 years ago

I added a macro call iterator-fn which you can use in place of f/fn that will wrap the body in a type hinted call to .iterator.