aalkilani / spark-kafka-cassandra-applying-lambda-architecture

Other
64 stars 52 forks source link

Chapter 5 - state management using... - Zeppelin error - missing parameter type #28

Open robbie70 opened 6 years ago

robbie70 commented 6 years ago

Hi there, I am trying to run the example shown in Chapter 5 - "state management using...." and have pasted the code I am running below. When I try to execute the paragraph in Zeppelin I get the following error,

inputPath: String = file:///vagrant/input
textDStream: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@7b181d30
activityStream: org.apache.spark.streaming.dstream.DStream[Activity] = org.apache.spark.streaming.dstream.TransformedDStream@7d753e31
<console>:60: error: missing parameter type
            .map { r => ((r.getString(0), r.getLong(1)),
                   ^

I cant see the issue and have spent some time looking at it and double checking that I copied the example from the lesson correctly. Any points/help would be appreciated.

Code entered into the Paragraph

val inputPath = "file:///vagrant/input" val textDStream = ssc.textFileStream(inputPath) val activityStream = textDStream.transform( input => { input.flatMap { line => val record = line.split("\t") val MS_IN_MIN = 1000 60 if (record.length == 7) Some(Activity(record(0).toLong / MS_IN_MIN MS_IN_MIN, record(1), record(2), record(3), record(4), record(5), record(6))) else None } } )

activityStream.transform( rdd => { val df = rdd.toDF() df.registerTempTable("activity") val activityByProduct = sqlContext.sql("""SELECT product, timestamp_hour, sum(case when action = 'purchase' then 1 else 0 end) as purchase_count, sum(case when action = 'add_to_cart' then 1 else 0 end) as add_to_cart_count, sum(case when action = 'page_view' then 1 else 0 end) as page_view_count from activity group by product, timestamp_hour """)

activityByProduct
        .map { r => ((r.getString(0), r.getLong(1)),
          ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4))
        ) }

} ).updateStateByKey((newItemsPerKey: Seq[ActivityByProduct], currentState: Option[(Long, Long, Long, Long)]) => { var (prevTimestamp, purchase_count, add_to_cart_count, page_view_count) = currentState.getOrElse((System.currentTimeMillis(), 0L, 0L, 0L)) var result : Option[(Long, Long, Long, Long)] = null

    if (newItemsPerKey.isEmpty){
      if(System.currentTimeMillis() - prevTimestamp > 30000 + 4)
        result = None
      else
        result = Some((prevTimestamp, purchase_count, add_to_cart_count, page_view_count))
    } else {

      newItemsPerKey.foreach(a => {
        purchase_count += a.purchase_count
        add_to_cart_count += a.add_to_cart_count
        page_view_count += a.page_view_count
      })

      result = Some((System.currentTimeMillis(), purchase_count, add_to_cart_count, page_view_count))
    }

    result
  })

statefulActivityByProduct.foreachRDD(rdd => { rdd.map(item => ActivityByProduct(item._1._1, item._1._2, item._2._1, item._2._2, item._2._3)) .toDF().registerTempTable("statefulActivityByProduct") })

ssc.start()

mateos-alliaj commented 6 years ago

I have the same problem.

VineshNair83 commented 4 years ago

were you able to resolve this error?

sher04lock commented 4 years ago

Hi all, You've probably found solution already but for future reference: I fixed it with converting activityByProduct DataFrame to RDD before calling .map:

activityByProduct.rdd.map { r =>
          ((r.getString(0), r.getLong(1)),
            ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4)))
        }