sorenmacbeth / flambo

A Clojure DSL for Apache Spark
Eclipse Public License 1.0
606 stars 84 forks source link

Examples using Spark Streaming? #15

Closed arnaudsj closed 10 years ago

arnaudsj commented 10 years ago

Hi, I am looking at an example project using Flambo, which demonstrates how to create both a streaming producer and consumer for demo purposes.

Ultimately I would like to write a Twitter stream producer, and a NLP consumer.

Any pointers?

Thank you in advance!

sorenmacbeth commented 10 years ago

Unfortunately, this is is still an outstanding to do. I'd love a pull request with some examples if you do get around to creating them.

I think the API is pretty straightforward and you should be able to figure it out looking at the examples from the spark documentation.

arnaudsj commented 10 years ago

@sorenmacbeth thank you.

I have started converting the Twitter hash tag example, but I am stuck on how to actually starting the streaming context. Would you mind taking a look and telling me what I am missing?

(ns core-nlp-flambo-101.core
  (:require  [flambo.api :as f]
             [flambo.streaming :as fs]
             [flambo.conf :as fconf]
             [clj-time.format :as tf]
             [clj-time.core :as tc]
             [clojure.tools.trace :refer [trace]]
             [clojure.java.shell :refer [sh]]
             [clojure.pprint :refer [pprint]]
             [org.satta.glob :refer [glob]])
  (:import   [org.apache.spark.streaming.twitter TwitterUtils])
  (:gen-class))

(def master "local[*]")
(def conf {})
(def env {
          "spark.executor.memory" "4G",
          "spark.files.overwrite" "true"
          })

(def ^:dynamic *app-consumer-key* "xxx")
(def ^:dynamic *app-consumer-secret* "xxx")
(def ^:dynamic *user-access-token* "xxx")
(def ^:dynamic *user-access-token-secret* "xxx")

(System/setProperty "twitter4j.oauth.consumerKey", *app-consumer-key*)
(System/setProperty "twitter4j.oauth.consumerSecret", *app-consumer-secret*)
(System/setProperty "twitter4j.oauth.accessToken", *user-access-token*)
(System/setProperty "twitter4j.oauth.accessTokenSecret", *user-access-token-secret*)

(defn new-spark-context []
  (let [c (-> (fconf/spark-conf)
              (fconf/master master)
              (fconf/app-name "twitter-hashtags-example")
              (fconf/set "spark.akka.timeout" "300")
              (fconf/set conf)
              (fconf/set-executor-env env))]
    (fs/streaming-context c 2000) ))

(defonce sc (new-spark-context))

(defonce stream (TwitterUtils/createStream sc))

(fs/flat-map stream fs/print)

It is an incomplete translation of the example I found here: [https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala]

Thank you in advance! If you can tell me what I need to do to start the streaming context, then I think the rest of the translation of the rest should be pretty straight forward.

Cheers!

arnaudsj commented 10 years ago

Nevermind, I had missed at the end:

(.start sc)

More later!

arnaudsj commented 10 years ago

Hi @sorenmacbeth, would you mind taking a look at this. I got the stream of tweets and hashtags extracted, but I am running into an error when trying to reduce-by-key-and-window. Please see below:

(ns core-nlp-flambo-101.core
  (:require  [flambo.api :as f]
             [flambo.streaming :as fs]
             [flambo.conf :as fconf]
             [clj-time.format :as tf]
             [clj-time.core :as tc]
             [clojure.tools.trace :refer [trace]]
             [clojure.java.shell :refer [sh]]
             [clojure.pprint :refer [pprint]]
             [org.satta.glob :refer [glob]])
  (:import   [org.apache.spark.streaming.twitter TwitterUtils]
             [org.apache.log4j Level Logger])
  (:gen-class))

(def master "local[*]")
(def conf {})
(def env {
          "spark.executor.memory" "4G",
          "spark.files.overwrite" "true"
          })

;; We don't need to see everything ;
(.setLevel (Logger/getRootLogger) Level/WARN)

(def ^:dynamic *app-consumer-key* "xxxx")
(def ^:dynamic *app-consumer-secret* "xxxx")
(def ^:dynamic *user-access-token* "xxxx")
(def ^:dynamic *user-access-token-secret* "xxxx")

(System/setProperty "twitter4j.oauth.consumerKey", *app-consumer-key*)
(System/setProperty "twitter4j.oauth.consumerSecret", *app-consumer-secret*)
(System/setProperty "twitter4j.oauth.accessToken", *user-access-token*)
(System/setProperty "twitter4j.oauth.accessTokenSecret", *user-access-token-secret*)

(defn new-spark-context []
  (let [c (-> (fconf/spark-conf)
              (fconf/master master)
              (fconf/app-name "core-nlp-flambo-101")
              (fconf/set "spark.akka.timeout" "300")
              (fconf/set conf)
              (fconf/set-executor-env env))]
    (fs/streaming-context c 2000) ))

(defonce sc (new-spark-context))

(fs/checkpoint sc "/tmp/")

(defonce tweet-stream (TwitterUtils/createStream sc))

;; ==================
;; Helper functions
;; ==================

(defn extract-hashtag [s]
  (re-find (re-pattern "#\\w+\\b") s))

;; ==================
;; Stream processing
;; ==================

;; Scala equivalent
;; val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

 (defonce hashTags (fs/flat-map tweet-stream (fn[t] (filter #(< 0 (count %)) [(extract-hashtag (.getText t))]))))

;; Scala equivalent
;; val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
;;                      .map{case (topic, count) => (count, topic)}
;;                      .transform(_.sortByKey(false))

(defonce topCounts30 (fs/reduce-by-key-and-window hashTags (f/fn [[_ + _]]) 30000 30000))

;; =============
;; Stream output
;; =============

(fs/print topCounts30)

(.start sc)

(.awaitTermination sc)

It chokes on Caused by: java.lang.ClassCastException: Cannot cast flambo.function.PairFunction to org.apache.spark.api.java.function.Function

Any ideas?

Thank you for your time!

arnaudsj commented 10 years ago

This is resolved. I am going to submit a pull request.