hortonworks-spark / shc

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
Apache License 2.0
552 stars 281 forks source link

Getting Exception like "Queries with streaming sources must be executed with writeStream.start()" in SHC Spark Structured Streaming #349

Open PrasadKumar716 opened 1 year ago

PrasadKumar716 commented 1 year ago

I have a Spark Structured Streaming application where I'd like to write streaming data to HBase using SHC. Does anyone know a solution or way/workaround to still use the SHC for writing structured streaming data to HBase? Thanks in advance!

`val rowsdf = spark \ .readStream \ .format('kafka') \ .option('kafka.bootstrap.servers', brokers) \ .option('subscribe', topic) \ .option('group.id', group_id) \ .option('maxOffsetsPerTrigger', 1000) \ .option("startingOffsets", "earliest") \ .load() rowsdf.printSchema()

catalog = ''' { "table": { "namespace": "default", "name": "changes", "tableCoder": "PrimitiveType" }, "rowkey": "consumer_id", "columns": { "consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"}, "audit_system_id": {"cf": "d", "col": "audit_system_id", "type": "string"}, "object_path": {"cf": "d", "col": "object_path", "type": "string"}, "object_type": {"cf": "d", "col": "object_type", "type": "string"}, "what_action": {"cf": "d", "col": "what_action", "type": "string"}, "when": {"cf": "t", "col": "when", "type": "bigint"}, "where": {"cf": "d", "col": "where", "type": "string"}, "who": {"cf": "d", "col": "who", "type": "string"}, "workstation": {"cf": "d", "col": "workstation", "type": "string"} } }'''

 rowsdf.writeStream
        .outputMode("append")
        .format("hbase.HBaseSinkProvider")
        .option("HBaseTableCatalog.tableCatalog", catalog)
        .option("truncate", false)
        .option("checkpointLocation", "/tmp/checkpoint")
        .trigger(Trigger.ProcessingTime("180 seconds"))
        .start()
        .awaitTermination()`

HBaseSinkProvider

`package org.apache.spark.sql.execution.datasources.hbase import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging { // String with HBaseTableCatalog.tableCatalog private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val df = data.sparkSession.createDataFrame(data.rdd, data.schema) df.write .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog, HBaseTableCatalog.newTable -> "5")) .format("org.apache.spark.sql.execution.datasources.hbase").save()

} }

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister { def createSink( sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { new HBaseSink(parameters) }

def shortName(): String = "hbase" }`

When running the application I'm getting the following message:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();; at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381)