stream-processing-with-spark / notebooks

Interactive Notebooks that support the book
Apache License 2.0
38 stars 27 forks source link

Can't write Dataset[SensorData] to Kafka without transformation #6

Open konobey opened 4 years ago

konobey commented 4 years ago

Hello!

The code in notebook kafka-sensor-data-generator.snb.ipynb:

val query = sensorValues.writeStream.format("kafka")
  .queryName("kafkaWriter")
  .outputMode("append")
  .option("kafka.bootstrap.servers", kafkaBootstrapServer) // comma-separated list of host:port
  .option("topic", targetTopic)
  .option("checkpointLocation", workDir+"/generator-checkpoint")
  .option("failOnDataLoss", "false") // use this option when testing
  .start()

doesn't work because sensorValues is of type Dataset[SensorData], but there should be value attribute of type String concatenating all the attributes from Dataset[SensorData] row.

Kiollpt commented 4 years ago

@konobey Did you figure out for it?

maasg commented 4 years ago

@Kiollpt I'm sorry for this error in the code. Kafka only accepts key-value datasets. The data must be converted into the right shape before writing to Kafka. This can be done by, for example, converting the records to JSON format.

This should do the trick:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val kvStream = sensorValues.select($"sensorId".cast(StringType) as "key", to_json(struct("*")) as "value")

Then, the kvStream can be written to Kafka:

val query = kvStream.writeStream.format("kafka")
  .queryName("kafkaWriter")
  .outputMode("append")
  .option("kafka.bootstrap.servers", kafkaBootstrapServer) // comma-separated list of host:port
  .option("topic", targetTopic)
  .option("checkpointLocation", workDir+"/generator-checkpoint")
  .option("failOnDataLoss", "false") // use this option when testing
  .start()
Kiollpt commented 4 years ago

@maasg Thank you for the method

Kiollpt commented 4 years ago

About the action in Ch9 it would be like this

val KafkaSchema = Encoders.product[SensorData].schema
val iotData = rawData
   .select(from_json($"value".cast("string"),KafkaSchema) as "record")
   .select("record.*").as[SensorData]
konobey commented 4 years ago

@maasg Thanks! Could you fix the code in notebook, please?