thedataincubator / spark-structured-streaming

A short course on the new, experimental features by The Data Incubator and O'Reilly Strata.
http://shop.oreilly.com/product/0636920057482.do
16 stars 12 forks source link

Spark Structured Streaming Parsing Data using NETCAT #2

Open omkarkulkarni opened 7 years ago

omkarkulkarni commented 7 years ago

@tianhuil @cmoscardi @chrislerus

When I try to send data from

nc -lk 9999 in the format of : Name, City, Country, Age Bob, Paris, France, 15

and I need a schema for Spark Structured Streaming. They way I am defining my "peopleStream" as per the tutorial is below.

`val peopleStream = ( spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .schema(caseSchema) .option("header",true) .load() )

`

I get below error in my output : `omkar@rudra:~/code$ sbt package [info] Set current project to Simple Project (in build file:/home/omkar/code/) [info] Compiling 1 Scala source to /home/omkar/code/target/scala-2.11/classes... [error] /home/omkar/code/src/main/scala/CustomReceiver1.scala:64: value writeStream is not a member of org.apache.spark.sql.streaming.DataStreamReader [error] (peopleStream.writeStream() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 11 s, completed Dec 29, 2016 7:51:01 PM

`

PFA the scala file I am trying to execute.

CustomReceiver1.scala.zip

Please help.

PS : Very new to spark and scala. Stuck for a long time.

Thanks, Omkar

tianhuil commented 7 years ago

It looks like you do not have the right version of Spark correctly installed, that sbt refers to the right version, since Structured Streaming is a relatively new API.

Second, it looks like it thinks peopleStream is of type peopleStream, but I think you meant for it to be a DataSet, which actually supports writeStream. If you're new to Scala and Spark, I would suggest annotating types carefully to help with your debugging.

omkarkulkarni commented 7 years ago

Dear Michael,

Thank you so much for the clarification.

I was also very interested to know if we can Plot the streaming data which we have received in a streaming structured dataframe.

For example :

###########################################################################################

read the line from a socket.

lines = spark.readStream.format('socket').option('host', 'localhost').option('port', 9999).option('includeTimestamp', 'true').load()

set the schema and type for the dataframe.

Data = lines.select( split("value", ",").alias("X").getItem(0).cast(IntegerType()), split("value", ",").alias("Y").getItem(1).cast(IntegerType()), lines.timestamp )

write the output to console.

query1 = Data.writeStream.outputMode('append').format('console').start()

#############################################################################################

I am able to see the out put on the console [when a start data server using nc -lk 9999 in a different terminal]

How can I pass this value to a different function to Plot this data (streaming plot) or how can I extract the latest value that is printed in the console to a variable (ex a list) . Is it even possible? As I checked in 'Unsupported operations' it says "Limit and take first N rows are not supported on streaming Datasets."

PS: Your videos are really helpful, I appreciate the efforts put by you and your team.

Thanks, Omkar Kulkarni

On Fri, Dec 30, 2016 at 4:49 AM, Michael Li notifications@github.com wrote:

It looks like you do not have the right version of Spark correctly installed, that sbt refers to the right version, since Structured Streaming is a relatively new API.

Second, it looks like it thinks peopleStream is of type peopleStream, but I think you meant for it to be a DataSet, which actually supports writeStream. If you're new to Scala and Spark, I would suggest annotating types carefully to help with your debugging.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/thedataincubator/spark-structured-streaming/issues/2#issuecomment-269728999, or mute the thread https://github.com/notifications/unsubscribe-auth/AHT2e5nk5Nz6OgOzh8lBsVC3yYFvR7Y9ks5rNH9igaJpZM4LXw7x .