Azure / azure-event-hubs-spark

Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Apache License 2.0
233 stars 174 forks source link

No records in Spark Events hub from Structred Streaming Example #195

Closed khajaasmath786 closed 6 years ago

khajaasmath786 commented 7 years ago

I started exploring structured streaming with Azure Event Hubs, I don't see any output in the console. I can see data if it is with direct create stream. Am I missing anything here?

val eventhubParameters = Map[String, String] ( "eventhubs.policyname" -> policyName, "eventhubs.policykey" -> policyKey, "eventhubs.namespace" -> eventHubNamespace, "eventhubs.name" -> eventHubName, "eventhubs.partition.count" -> partitionCount, "eventhubs.consumergroup" -> consumerGroup, "eventhubs.progressTrackingDir" -> progressDir, //"eventhubs.checkpoint.dir" ->checkpointLocation, //"eventhubs.maxRate" -> s"$maxRate", "eventhubs.sql.containsProperties" -> "true",

  "eventhubs.sql.userDefinedKeys" -> "creationTime,randomUserProperty"
)
val sparkConfiguration : SparkConf = new SparkConf()
sparkConfiguration.setMaster("local")
val sparkSession = SparkSession.builder.config(sparkConfiguration).getOrCreate()
val inputStream = sparkSession.readStream.format("eventhubs").options(eventhubParameters)
  .load()

//[publisher, partitionKey, body, creationTime, randomUserProperty, offset, enqueuedTime, seqNumber]
val schema = StructType(Seq(
  StructField("creationTime", StringType, true),
  StructField("body", StringType, true)
))
import sparkSession.implicits._
//val df = inputStream.selectExpr("cast (body as string) as json").select(from_json($"json", schema=schema).as("data")).select("data.*")

/* val df = inputStream.selectExpr("cast (body as string) as json")

println(df.isStreaming)
val streamingQuery1 = df.writeStream
  .outputMode("append")
  .queryName("table")
  .format("console")
  .start()*/

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val streamingQuery1 =inputStream.writeStream.
  format("console").  // <-- use ConsoleSink
  option("truncate", false).
  option("numRows", 10).
  //trigger(Trigger.ProcessingTime(10.seconds)).
  queryName("rate-console").
  start

/ val streamingQuery1 = inputStream.writeStream. outputMode("append"). //trigger(ProcessingTime("10 seconds")). option("checkpointLocation", checkpointLocation). format("csv").option("path", outputPath + "/ETL").start()/

streamingQuery1.awaitTermination()
//spark.stop()

} }

Output:


Batch: 0

+----+------+---------+------------+---------+------------+------------+------------------+ |body|offset|seqNumber|enqueuedTime|publisher|partitionKey|creationTime|randomUserProperty| +----+------+---------+------------+---------+------------+------------+------------------+ +----+------+---------+------------+---------+------------+------------+------------------+

khajaasmath786 commented 7 years ago

I tried with CSV output too and still I can see files without any records in it.

val streamingQuery1 = df.writeStream. outputMode("append"). //trigger(ProcessingTime("10 seconds")). option("checkpointLocation", checkpointLocation). format("csv").option("path", outputPath + "/ETL").start()*/

sabeegrewal commented 6 years ago

Hey Asmath, currently working on this now!

khajaasmath786 commented 6 years ago

Thanks Sabee Grewal. This would be great help if we can figure this out. I want to use structured streaming instead of direct stream. if nothing works, will still use direct stream.

On Wed, Nov 1, 2017 at 3:44 PM, Sabee Grewal notifications@github.com wrote:

Hey Asmath, currently working on this now!

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/Azure/spark-eventhubs/issues/195#issuecomment-341236418, or mute the thread https://github.com/notifications/unsubscribe-auth/AIWYMc0MqJHMNi0lt7f5P-2hqtBf1KpFks5syNgVgaJpZM4QH54u .

sabeegrewal commented 6 years ago

Hey Asmath, sorry I got sidetracked with a few other issues. This code sample is working for me:

    val ehParams = Map[String, String](
      "eventhubs.policyname" -> "RootManageSharedAccessKey",
      "eventhubs.policykey" -> "XXXXXXXXX",
      "eventhubs.namespace" -> "XXXXXXXXX",
      "eventhubs.name" -> "XXXXXXXXX",
      "eventhubs.partition.count" -> "X",
      "eventhubs.consumergroup" -> "$Default",
      "eventhubs.progressTrackingDir" -> "C:/local_test",
      "eventhubs.maxRate" -> s"20"
    )

    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._
    val inputStream = spark.readStream
      .format("eventhubs")
      .options(ehParams)
      .load()

    val streamingDataFrame = inputStream
      .selectExpr("CAST (body AS STRING) AS JSON")

    val query = streamingDataFrame.writeStream
      .outputMode("complete")
      .format("console")

    query.start().awaitTermination()