RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
935 stars 369 forks source link

sc.toRedis throws NotSerializableException for the stateful streaming job #51

Open kikulikov opened 7 years ago

kikulikov commented 7 years ago

I have a stateful streaming Spark job like the one below. I'd like to save some calculated values to Redis. Redis seems to work fine without ssc.checkpoint. When I add checkpoint it starts to fail with an error java.io.NotSerializableException: org.apache.spark.SparkContext. But I need checkpoint to persist the state. How to make redis-spark work for me? Am I missing something?

object StreamingWordCount {

  case class WordItem(item: String, count: Long)

  case class WordGroup(key: String, count: Long)

  def updateUserEvents(key: String, value: Option[WordItem], state: State[WordGroup]): Option[WordGroup] = {

    val existing = {
      state.getOption().map(_.count).getOrElse(0L)
    }

    val updated = {
      value.map(s => WordGroup(key, s.count + existing)).getOrElse(WordGroup(key, existing))
    }

    state.update(updated)
    Some(updated)
  }

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        s"""
           |Usage: DirectKafkaWordCount <brokers> <topics>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |
        """.stripMargin) // TODO redis stuff as parameters
      System.exit(1)
    }

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("StreamingWordCount")
      .set("redis.host", "redis")
      .set("redis.port", "6379")
      .set("redis.auth", "bunchofmonkeys")

    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(10))
    ssc.checkpoint("/tmp/checkpoint")

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines: DStream[String] = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _).map(s => (s._1, s._2))

    // Merges the current batch with the state
    val stateSpec = StateSpec.function(updateUserEvents _)
    val states = wordCounts.map(s => (s._1, WordItem(s._1, s._2))).mapWithState(stateSpec)
    states.print()

    // import com.redislabs.provider.redis._

    val prepared = states.flatMap(s => s).map(s => (s.key, s.count.toString))

    prepared.foreachRDD { rdd =>
        if (!rdd.isEmpty()) {
        // sc.toRedisZSET(rdd, "word_count", 0) // doesn't work here :: java.io.NotSerializableException: org.apache.spark.SparkContext
          rdd.foreach(wordGroup => {
            // sc.toRedisZSET(rdd, "word_count", 0) // doesn't fit here
            println(s"### $wordGroup")
          })
      }
    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
xingshaoqi commented 4 years ago

@kikulikov Have you solved it

xingshaoqi commented 4 years ago

@kikulikov I have the same problem