caroljmcdonald / mapr-sparkml-streaming-uber

21 stars 27 forks source link

unable to convert a rrd into DF() #1

Open BhagyasriYella opened 5 years ago

BhagyasriYella commented 5 years ago

Hi Carol, I am using your code to do the K-means algorithm on the uber data to analyse the high demand pick up areas.

Thanks for the model . It is very well explained and coded. I have done the K-means modelling from part 1 and now trying to fit the model with the test csv file. I am following the code from here: https://github.com/caroljmcdonald/mapr-sparkml-streaming-uber/blob/master/src/main/scala/com/sparkkafka/uber/SparkKafkaConsumerProducer.scala I am able to convert the streaming data to rdd, but unable to convert rdd to dataframe.

scala> valuesDStream.foreachRDD {
     |  rdd => if (!rdd.isEmpty) {
     |  case class Uber(dt: String, lat: Double, lon: Double, base: String) extends Serializable
     |  //def parseUber(str: String): Uber = Uber(str(0), str(1).toDouble, str(2).toDouble, str(3))
     |  val count = rdd.count
     |   println("count received " + count)
     |  val sqlContext= new org.apache.spark.sql.SQLContext(sc)
     |  import sqlContext.implicits._
     |  rdd.collect().foreach(println)
     |  val df = rdd.map(_.split(",")).map(e => Uber(e(0), e(1).toDouble, e(2).toDouble, e(3)))
     |  val df1= sqlContext.createDataFrame(df,schema)
     |  df1.show()
     |  }
     |  }
<console>:72: error: overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[Uber], org.apache.spark.sql.types.StructType)
        val df1= sqlContext.createDataFrame(df,schema)

Also I have tries doing this way :

`val df = rdd.map(_.split(",")).map(e => Uber(e(0), e(1).toDouble, e(2).toDouble, e(3)))
val df1 = df.toDF()`

But getting the error as : <console>:69: error: value toDF is not a member of org.apache.spark.rdd.RDD[Uber] val df1 = df.toDF()

Any help here would be appriciated. Thanks in advance.

caroljmcdonald commented 5 years ago

try looking at the notebooks in the zeppelin viewer

https://www.zepl.com/viewer/github/caroljmcdonald/mapr-sparkml-streaming-uber/blob/master/notebooks/uberstreaming201.json

or do this

linesDStream.foreachRDD(rdd=>{ val uDStream = rdd.map(_.split(",")).map( p => Uber( p(0), p(1).toDouble, p(2).toDouble, p(3) ) uDStream.take(2).foreach(println) })

On Wed, Dec 12, 2018 at 4:59 AM BhagyasriYella notifications@github.com wrote:

Hi Carol, I am using your code to do the K-means algorithm on the uber data to analyse the high demand pick up areas.

Thanks for the model . It is very well explained and coded. I have done the K-means modelling from part 1 and now trying to fit the model with the test csv file. I am following the code from here: https://github.com/caroljmcdonald/mapr-sparkml-streaming-uber/blob/master/src/main/scala/com/sparkkafka/uber/SparkKafkaConsumerProducer.scala http://url I am able to convert the streaming data to rdd, but unable to convert rdd to dataframe.

scala> valuesDStream.foreachRDD { | rdd => if (!rdd.isEmpty) { | case class Uber(dt: String, lat: Double, lon: Double, base: String) extends Serializable | //def parseUber(str: String): Uber = Uber(str(0), str(1).toDouble, str(2).toDouble, str(3)) | val count = rdd.count | println("count received " + count) | val sqlContext= new org.apache.spark.sql.SQLContext(sc) | import sqlContext.implicits. | rdd.collect().foreach(println) | val df = rdd.map(.split(",")).map(e => Uber(e(0), e(1).toDouble, e(2).toDouble, e(3))) | val df1= sqlContext.createDataFrame(df,schema) | df1.show() | } | }

:72: error: overloaded method value createDataFrame with alternatives: (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[Uber], org.apache.spark.sql.types.StructType) val df1= sqlContext.createDataFrame(df,schema) Also I have tries doing this way : `val df = rdd.map(_.split(",")).map(e => Uber(e(0), e(1).toDouble, e(2).toDouble, e(3))) val df1 = df.toDF()` But getting the error as : :69: error: value toDF is not a member of org.apache.spark.rdd.RDD[Uber] val df1 = df.toDF() Any help here would be appriciated. Thanks in advance. — You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub , or mute the thread .