sorenmacbeth / flambo

A Clojure DSL for Apache Spark
Eclipse Public License 1.0
606 stars 84 forks source link

Add functionality for dataframes #126

Closed plandes closed 6 years ago

plandes commented 7 years ago

Since data frames are the next big thing in Spark, I have added functions to do things like this:

(-> (...create sql session...)
    (df/map row-encoder (f/fn ...

Note that unlike RDDs you have to specify the encoder. I can add more documentation if you like.

Also I have a lot of other things I could add for using flambo with spark. Also I have gotten a repl to work in Spark (powderkeg not necessary). However, I wanted things slowly this seems like an inactive project.

Love Flambo--its been very helpful in my research so kudos for making a great software package!

Regards, Paul

NonaryR commented 7 years ago

Hi @plandes, great work! Can you provide some example or test of this functionality? For example, simple mapping and filtering over dataframe?

NonaryR commented 7 years ago

@plandes any updates?

plandes commented 7 years ago

Sorry, not yet. This week is tough, but I'll have time to address this over the weekend. It's not trivial to provide an example by copy/paste since I've written a lot of helper libraries around flambo. I could contribute those as well (as mentioned) but then the pull request wouldn't be trivial.

plandes commented 6 years ago

I've come up with something that should make sense--although there is a missing function. Again it should make sense.

Note that this is a contrived example because I have many other functions I now use to do things like create cached (think mutex protected atom) Spark context, session, RowEncoder (needed for data frames) and many other functions needed for the new DataFrame API.

Again, let me know if you have questions, but maybe a larger pull request would be easier. Up to you.

(let [struct (->> ["id" "host" "yada"]
                    (map #(hash-map :name %))
                    (#(concat % [{:name "some_score" :type :integer}]))
                    sql/struct-type)
        row-encoder (RowEncoder/apply struct)
        query (str "select id, some_text from some_table"
                   (if limit (format " limit %d" limit)))]
  (-> (conf/spark-conf)
      (conf/app-name "name")
      (f/spark-cluster)
      (.getOrCreate (SparkSession/builder))
      (.sql query)
      (df/map row-encoder
              (f/fn [{:keys [id some_text]}]
                [id (get-host) (str some_text "more text") (.hashCode id)]))
      .write
      (.parquet "hdfs://some/tmp/dir")))
plandes commented 6 years ago

Re the moving dataframe to flambo.sql, this seems like a bigger decision. If you decide to include it in flambo.sql I would respectfully disagree with that solution, but I don't feel terribly strongly about it and would gladly fold it in.

Thanks again for writing flambo! I believe I mentioned I was able to get it to work in a REPL. Being able to prototype in a REPL via Cider in Emacs, then reusing code in a spark-submit has been tremendously powerful and useful and I have this project to thank for that!

Cheers!

Regards, Paul

leon-barrett commented 6 years ago

Cool, looks good, thanks.