hortonworks-spark / shc

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
Apache License 2.0
552 stars 281 forks source link

SHC with Spark Structured Streaming #205

Open EDALJO opened 6 years ago

EDALJO commented 6 years ago

Hi,

I have a Spark Structured Streaming application where I'd like to write streaming data to HBase using SHC. It reads data from a location where new csv files continuously are being created. The defined catalog works for writing a DataFrame with identical data into HBase. The key components of my streaming application are a DataStreamReader and a DataStreamWriter.

val inputDataStream = spark
      .readStream
      .option("sep", ",")
      .schema(schema)
      .csv("/path/to/data/*.csv")

inputDataStream
      .writeStream
      .outputMode("append")
      .options(
        Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .start

When running the application I'm getting the following message:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.execution.datasources.hbase does not support streamed writing at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:285) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286) at my.package.SHCStreamingApplication$.main(SHCStreamingApplication.scala:153) at my.package.SHCStreamingApplication.main(SHCStreamingApplication.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Does anyone know a solution or way/workaround to still use the SHC for writing structured streaming data to HBase? Thanks in advance!

sutugin commented 6 years ago

You can write your custom sink provider, inherited from StreamSinkProvider, this is my implementation:

package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
  // String with HBaseTableCatalog.tableCatalog
  private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {   
    val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
        HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.spark.sql.execution.datasources.hbase").save()
  }
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new HBaseSink(parameters)
  }

  def shortName(): String = "hbase"
}

This is example, how to use ():

 inputDF.
      writeStream.
      queryName("hbase writer").
      format("HBase.HBaseSinkProvider").
      option("checkpointLocation", checkPointProdPath).
      option("hbasecat", catalog).
      outputMode(OutputMode.Update()).
      trigger(Trigger.ProcessingTime(30.seconds)).
      start
EDALJO commented 6 years ago

Thanks for your answer - exactly the type of solution I was looking for. I only had time to test it quickly, but seems to be working perfectly!

sutugin commented 6 years ago

Excellent, glad to help!!!

hamroune commented 6 years ago

Thank you, much helps

merfill commented 6 years ago

I've implemented your solution with HBaseSinkProvider by following steps:

  1. Clone shc.
  2. Put HBaseSinkProvider.scala to core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase.
  3. Compile shc.
  4. Run spark-submit with --jars some_path/shc/core/target/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar

My code is written in python, I'm including it below. The error is: pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources must be executed with writeStream.start();; I'm new in Spark and haven't experience Scala, so I cannot understand the problem. Can you please help me with it?

def consume(schema_name, brokers, topic, group_id): spark = SparkSession \ .builder \ .appName('SparkConsumer') \ .config('hbase.zookeeper.property.clientPort', '2282') \ .getOrCreate()

print 'read Avro schema from file: {}...'.format(schema_name)
schema = avro.schema.parse(open(schema_name, 'rb').read())
reader = avro.io.DatumReader(schema)
print 'the schema is read'

rows = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', brokers) \
    .option('subscribe', topic) \
    .option('group.id', group_id) \
    .option('maxOffsetsPerTrigger', 1000) \
    .option("startingOffsets", "earliest") \
    .load()
rows.printSchema()

schema = StructType([ \
        StructField('consumer_id', StringType(), False), \
        StructField('audit_system_id', StringType(), False), \
        StructField('object_path', StringType(), True), \
        StructField('object_type', StringType(), False), \
        StructField('what_action', StringType(), False), \
        StructField('when', LongType(), False), \
        StructField('where', StringType(), False), \
        StructField('who', StringType(), True), \
        StructField('workstation', StringType(), True) \
    ])

def decode_avro(msg):
    bytes_reader = io.BytesIO(bytes(msg))
    decoder = avro.io.BinaryDecoder(bytes_reader)
    data = reader.read(decoder)
    return (\
            data['consumer_id'],\
            data['audit_system_id'],\
            data['object_path'],\
            data['object_type'],\
            data['what_action'],\
            data['when'],\
            data['where'],\
            data['who'],\
            data['workstation']\
           )

udf_decode_avro = udf(decode_avro, schema)

values = rows.select('value')
values.printSchema()

changes = values.withColumn('change', udf_decode_avro(col('value'))).select('change.*')
changes.printSchema()

change_catalog = '''
{
    "table":
    {
        "namespace": "default",
        "name": "changes",
        "tableCoder": "PrimitiveType"
    },
    "rowkey": "consumer_id",
    "columns":
    {
        "consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
        "audit_system_id": {"cf": "d", "col": "audit_system_id", "type": "string"},
        "object_path": {"cf": "d", "col": "object_path", "type": "string"},
        "object_type": {"cf": "d", "col": "object_type", "type": "string"},
        "what_action": {"cf": "d", "col": "what_action", "type": "string"},
        "when": {"cf": "t", "col": "when", "type": "bigint"},
        "where": {"cf": "d", "col": "where", "type": "string"},
        "who": {"cf": "d", "col": "who", "type": "string"},
        "workstation": {"cf": "d", "col": "workstation", "type": "string"}
    }
}'''

query = changes \
    .writeStream \
    .format('HBase.HBaseSinkProvider')\
    .option('hbasecat', change_catalog) \
    .option("checkpointLocation", '/tmp/checkpoint') \
    .outputMode("append") \
    .start()

query.awaitTermination()
sutugin commented 6 years ago

Try to write to the console orfile, will there be the same error?

merfill commented 6 years ago

No, when I'm trying to write records to console, everything is OK. I'm using following python code instead HBase.HBaseSinkProvider:

query = changes \
         .writeStream \
        .outputMode("append") \
        .format('console') \
        .start()

Output is somethinf like this:

Batch: 7

+-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+ |consumer_id|audit_system_id| object_path|object_type|what_action| when| where| who| workstation| +-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+ | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Added|1520171584122|172.28.26.190|PD6\fsatestuser1|win10test1.pd6.local| | 111| 222|\172.28.26.190\T...| File| Added|1520171584126|172.28.26.190|PD6\fsatestuser1|win10test1.pd6.local| +-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+ only showing top 20 rows

2018-06-27 14:01:26 INFO WriteToDataSourceV2Exec:54 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@30f35fc5 committed. 2018-06-27 14:01:26 INFO SparkContext:54 - Starting job: start at NativeMethodAccessorImpl.java:0 2018-06-27 14:01:26 INFO DAGScheduler:54 - Job 15 finished: start at NativeMethodAccessorImpl.java:0, took 0,000037 s 2018-06-27 14:01:26 INFO MicroBatchExecution:54 - Streaming query made progress: { "id" : "1e6076ad-b403-46ca-9438-e4913660700d", "runId" : "7740c36b-dc2d-4880-8ea6-0efde89b1ef5", "name" : null, "timestamp" : "2018-06-27T11:01:26.389Z", "batchId" : 7, "numInputRows" : 669, "inputRowsPerSecond" : 883.7516512549538, "processedRowsPerSecond" : 1327.3809523809523, "durationMs" : { "addBatch" : 471, "getBatch" : 4, "getOffset" : 2, "queryPlanning" : 11, "triggerExecution" : 503, "walCommit" : 15 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[changes]]", "startOffset" : { "changes" : { "0" : 7000 } }, "endOffset" : { "changes" : { "0" : 7669 } }, "numInputRows" : 669, "inputRowsPerSecond" : 883.7516512549538, "processedRowsPerSecond" : 1327.3809523809523 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3715c84c" } }

sutugin commented 6 years ago

Unfortunately I am now without a computer, try to run the outputMode to update, if it does not help and will not be able to find a solution, then email me after July 5, I will try to help.

merfill commented 6 years ago

Unfortunatelly, "update" mode also not work. I've received the same error (see below). Thank you in advance.

pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources must be executed with writeStream.start();;\nLogicalRDD [key#52, value#53, topic#54, partition#55, offset#56L, timestamp#57, timestampType#58], true\n\n=== Streaming Query ===\nIdentifier: [id = 66509eea-e706-4745-9b35-f82f09752b43, runId = c0e78fe7-f978-4f64-88cd-801d766f78c5]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaSource[Subscribe[changes]]: {"changes":{"0":7669}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [change#22.consumer_id AS consumer_id#25, change#22.audit_system_id AS audit_system_id#26, change#22.object_path AS object_path#27, change#22.object_type AS object_type#28, change#22.what_action AS what_action#29, change#22.when AS when#30L, change#22.where AS where#31, change#22.who AS who#32, change#22.workstation AS workstation#33]\n+- Project [value#8, decode_avro(value#8) AS change#22]\n +- Project [value#8]\n +- StreamingExecutionRelation KafkaSource[Subscribe[changes]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'

ddkongbb commented 6 years ago

Please try to use this version of shc (https://github.com/sutugin/shc) and compile with corresponding hbase/phoenix version. I have tried to use this without arvo format perfectly. /**

swarup5s commented 5 years ago

Hello @sutugin .. I've implemented your solution. However data is not getting updated in HBase. It's not even throwing any exception too. Can you suggest anything in this regard?

sutugin commented 5 years ago

Hi @swarup5s ,if you give me the implementation code and how you use it, show me the logs, maybe we can find the problem together.

swarup5s commented 5 years ago

Hi @sutugin thanks for your help. Appreciate it.

//this class is under ...org/apache/spark/sql/execution/streaming/ //path. I'm executing from IntelliJ.. package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.execution.datasources.hbase.Logging import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging { // String with HBaseTableCatalog.tableCatalog private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { val df = data.sparkSession.createDataFrame(data.rdd, data.schema) df.write .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog)) .format("org.apache.spark.sql.execution.datasources.hbase").save() } }

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister { def createSink( sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { new HBaseSink(parameters) }

def shortName(): String = "hbase" }

/* My code goes here------------------------------------------------------------------------------ */ //... //...

def catalog = s"""{ |"table":{"namespace":"default", "name":"hbase_table"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"subscriberName":{"cf":"subscriberInfo", "col":"Name", "type":"string"}, |"subscriberNumber":{"cf":"subscriberInfo", "col":"PhoneNumber", "type":"string"}, |"messageTemplate":{"cf":"messageInfo", "col":"template", "type":"string"}, |"lastTS":{"cf":"messageInfo", "col":"ts", "type":"String"} |} |}""".stripMargin

def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() } val BaseRecorddf = withCatalog(catalog) //record from HBase as Batch or normal Dataframe

val streamdf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "server_addr") .option("subscribe", "topic_name") .option("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") .option("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") .option("auto.offset.reset","earliest") .option("group.id","realTimeStream") .option("enable.auto.commit",true: java.lang.Boolean) .load()

//streaming Dataframe here
val schemaStreamDf = streamdf
  .selectExpr("CAST(key AS STRING) as IMSI", "CAST(value AS STRING) as Loc")

//...

//some biz logic here and some join between the batch and streaming dataframe and finaldf is the streaming DF //which should be written back to the HBase in real time.

try{

    finaldfd.
      writeStream.
      format("org.apache.spark.sql.execution.streaming.HBaseSinkProvider").
      option("checkpointLocation", "some_path_here").
      option("hbasecat",catalog).
      outputMode(OutputMode.Update()).
      trigger(Trigger.ProcessingTime(some_seconds))
      .start()
      .awaitTermination()

  }
catch {
  case e: Exception => println(e)
}

I've done some changes. now some StreamingQueryException is thrown but not sure what is getting wrong being a novice. Here's the logs:

org.apache.spark.sql.streaming.StreamingQueryException: null === Streaming Query === Identifier: [id = a0997feb-efeb-4976-b8eb-5efaa3c1b8c9, runId = 6f4181f0-ffd4-4aa9-a0bc-475457e4e93e] Current Committed Offsets: {} Current Available Offsets: {KafkaSource[Subscribe[topic_name]]: {"topic_name":{"2":282,"1":305,"0":293}}}

Current State: ACTIVE Thread State: RUNNABLE

Logical Plan: Project [rowkey#75, subscriberName#1, subscriberNumber#2, messageTemplate#3, 1537292624 AS lastTS#82] +- Project [rowkey#11 AS rowkey#75, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4] +- Filter (cast(cast(lastTS#4 as bigint) as double) < 1.536990223E9) +- Project [rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4] +- Project [rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4, Loc#46] +- Join Inner, (rowkey#11 = rowkey#45) :- Project [rowkey#0 AS rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4] : +- Relation[rowkey#0,subscriberName#1,subscriberNumber#2,messageTemplate#3,lastTS#4] HBaseRelation(Map(catalog -> { "table":{"namespace":"default", "name":"hbase_table"}, "rowkey":"key", "columns":{ "rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, "subscriberName":{"cf":"subscriberInfo", "col":"Name", "type":"string"}, "subscriberNumber":{"cf":"subscriberInfo", "col":"PhoneNumber", "type":"string"}, "messageTemplate":{"cf":"messageInfo", "col":"template", "type":"string"}, "lastTS":{"cf":"messageInfo", "col":"ts", "type":"String"} } }),None) +- Project [cast(key#30 as string) AS rowkey#45, cast(value#31 as string) AS Loc#46] +- StreamingExecutionRelation KafkaSource[Subscribe[topic_name]], [key#30, value#31, topic#32, partition#33, offset#34L, timestamp#35, timestampType#36]

On the other hand message is successfully written to the console.

stefcorda commented 5 years ago

Hello @sutugin, first of all thank your your great help. I'm experiencing @swarup5s 's problem when i call:

data.sparkSession.createDataFrame(data.rdd, data.schema)

The same happens whenever i call something like data.rdd

I think the problem is outside your code, somewhere else. Maybe more spark related?

sutugin commented 5 years ago

Hi @sympho410! Unfortunately, I can't find the problem by a few lines of code, I need to debug and look for the cause... The only assumption - try to change the order of the columns in accordance with how you have specified in the scheme

stefcorda commented 5 years ago

Hello @sutugin, I noticed just now you texted back! I managed to get past that problem thanks to another one of your commits:

 val schema = data.schema

    val res = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }

    val df = data.sparkSession.createDataFrame(res, schema)

Now it works properly - Thank you again for your help :)

vibnimit commented 5 years ago

Hello @sutugin and @sympho410 I am also working on a similar kind of problem and I want to make bulk put to HBase from structured spark streaming. I see the code above tries to does the but what I am not able to understand is the use of catalog here. It seems like a predefined schema kind of thing. but since Hbase is schema-less means I can add any new column as well in future so how can I fix a catalog prior? Can anyone of you explain me here what I am missing? and what exactly is the purpose and meaning of catalog is. Also, can anyone explain what is "5" here in this line-> HBaseTableCatalog.newTable -> "5" ? Any help is greatly appreciated.

Thanks in advance!

sutugin commented 5 years ago

Hello @sutugin and @sympho410 I am also working on a similar kind of problem and I want to make bulk put to HBase from structured spark streaming. I see the code above tries to does the but what I am not able to understand is the use of catalog here. It seems like a predefined schema kind of thing. but since Hbase is schema-less means I can add any new column as well in future so how can I fix a catalog prior? Can anyone of you explain me here what I am missing? and what exactly is the purpose and meaning of catalog is. Also, can anyone explain what is "5" here in this line-> HBaseTableCatalog.newTable -> "5" ? Any help is greatly appreciated.

Thanks in advance!

It seems to me - the meaning of the catalog is to properly structure the data for serialization and deserialization. The need to specify the scheme is a feature of the implementation of this library and is not tied to the structured streaming. You can try to work around these limitations by generating a schema on the fly, based on the schema of the data inside each butch, but you must be sure that all strings inside the Butch have the same schema or try to use foreach writer and get the schema for each row separately.

omkarahane commented 4 years ago

Can anyone please provide a compiled jar with HbaseSink compiled in, I tried building the shc project but i get the error

HBaseRelation.scala:108: value foreach is not a member of Nothing
        hBaseConfiguration.foreach(_.foreach(e => conf.set(e._1, e._2)))
                                     ^

I tried implementing "hbaseSink" class in my project and used SHC in maven dependency, but it is now working, i get an error as

Queries with streaming sources must be executed with writeStream.start();

It would be very helpful if I can get the compiled jar . Thanks

sutugin commented 4 years ago

Can anyone please provide a compiled jar with HbaseSink compiled in, I tried building the shc project but i get the error

HBaseRelation.scala:108: value foreach is not a member of Nothing
        hBaseConfiguration.foreach(_.foreach(e => conf.set(e._1, e._2)))
                                     ^

I tried implementing "hbaseSink" class in my project and used SHC in maven dependency, but it is now working, i get an error as

Queries with streaming sources must be executed with writeStream.start();

It would be very helpful if I can get the compiled jar . Thanks

Try to build from my fork (https://github.com/sutugin/shc), though I have not updated it for a long time. Only in pom.xml specify the actual version of spark for you, for me it is

2.3.1< / sparc.version>
omkarahane commented 4 years ago

@sutugin Thanks for replying, I'm working on databricks platform which hosts spark 2.4.3, so I have access to ".foreachbatch" api hence above is not needed anymore.

sutugin commented 4 years ago

@omkarahane Good idea, I already wrote someone about this method. https://github.com/hortonworks-spark/shc/pull/238#issuecomment-447698100

omkarahane commented 4 years ago

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help. https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming

Thanks.

sutugin commented 4 years ago

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help. https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming

Thanks.

@omkarahane , try make "fat" jar with sbt dependency libraryDependencies += "com.hortonworks.shc" % "shc-core" % "1.1.0.3.1.2.1-1". I think this will solve the problem with "java.lang.NoClassDefFoundError"

omkarahane commented 4 years ago

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help. https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming Thanks.

@omkarahane , try make "fat" jar with sbt dependency libraryDependencies += "com.hortonworks.shc" % "shc-core" % "1.1.0.3.1.2.1-1". I think this will solve the problem with "java.lang.NoClassDefFoundError"

I tried running a the job with a fat jar which was created using maven, still the issue wasn't resolved. I guess the fat jars created with sbt and maven would almost be the same?

sutugin commented 4 years ago

@omkarahane, maybe this will help you https://github.com/hortonworks-spark/shc/issues/223#issuecomment-375111619

omkarahane commented 4 years ago

@omkarahane, maybe this will help you #223 (comment)

@sutugin, Thanks a lot, you pointed me in the right direction, hbase jars were missing, I have added those jars and installed them as a library on the cluster so job has access to them, it solved my initial problem, but now I'm getting another exception:

java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;

This also seems to be a dependency issue, This is what I have tried,

  1. Uploaded the following jars: json4s-ast, json4s-core,json4s-jackson
  2. Versions tried, 3.4,3.5,3.6
  3. Put a maven dependency in my jar, built fat jar and uploaded it as library, so transitive dependencies can be satisfied.

Still getting the same error.

sutugin commented 4 years ago

@omkarahane, Similar problems are described here:

631068264 commented 4 years ago

@sutugin @merfill I add sutugin's scala to compile as merfill said.

But I get error I use Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;

public class KafkaStructStream implements Serializable {

    private String servers;
    private String jks;
    private String schema;

    public KafkaStructStream(String[] args) {
//        this.servers = args[0];
//        this.jks = args[1];
    }

    private Dataset<Row> initStructKafka() throws IOException {
        Properties prop = Config.getProp();
        this.schema = prop.getProperty("hbase.traffic.schema");
        SparkSession spark = SparkSession
                .builder()
                .appName("Kafka")
                .master("local[*]")
                .getOrCreate();
        return spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", prop.getProperty("kafka.broker.list"))
                .option("kafka.ssl.truststore.location", Config.getPath(Config.KAFKA_JKS))
//                .option("kafka.bootstrap.servers", this.servers)
//                .option("kafka.ssl.truststore.location", this.jks)
                .option("kafka.ssl.truststore.password", prop.getProperty("kafka.jks.passwd"))
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("startingOffsets", "latest")
//                .option("subscribe", kafkaProp.getProperty("kafka.topic"))

                .option("subscribe", "traffic")
                .load()
                .selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)");
    }

    private void run() {
        Dataset<Row> df = null;
        try {
            df = initStructKafka();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        df.printSchema();

        StructType trafficSchema = new StructType()
                .add("guid", DataTypes.StringType)
                .add("time", DataTypes.LongType)
                .add("end_time", DataTypes.LongType)
                .add("srcip", DataTypes.StringType)
                .add("srcmac", DataTypes.StringType)
                .add("srcport", DataTypes.IntegerType)
                .add("destip", DataTypes.StringType)
                .add("destmac", DataTypes.StringType)
                .add("destport", DataTypes.IntegerType)
                .add("proto", DataTypes.StringType)
                .add("appproto", DataTypes.StringType)
                .add("upsize", DataTypes.LongType)
                .add("downsize", DataTypes.LongType);

        Dataset<Row> ds = df.select(functions.from_json(df.col("value").cast(DataTypes.StringType), trafficSchema).as("data")).select("data.*");
        StreamingQuery query = ds.writeStream()
                .format("HBase.HBaseSinkProvider")
                .option("HBaseTableCatalog.tableCatalog", this.schema)
                .option("checkpointLocation", "/tmp/checkpoint")
                .start();

//        StreamingQuery query = ds.writeStream().format("console")
//                .trigger(Trigger.Continuous("2 seconds"))
//                .start();
        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        KafkaStructStream k = new KafkaStructStream(args);
        k.run();
    }

}

ERROR

19/11/20 15:35:09 ERROR MicroBatchExecution: Query [id = 3f3688bb-6c3d-45bc-ab33-23968069abc0, runId = 0346659e-cb5f-4ee2-919a-00ca124e1e3e] terminated with error
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2980)
    at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2978)
    at HBase.HBaseSink.addBatch(HBaseSinkProvider.scala:14)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

=== Streaming Query ===
Identifier: [id = 3f3688bb-6c3d-45bc-ab33-23968069abc0, runId = 0346659e-cb5f-4ee2-919a-00ca124e1e3e]
Current Committed Offsets: {}
Current Available Offsets: {KafkaSource[Subscribe[traffic]]: {"traffic":{"0":118641202}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [data#25.guid AS guid#27, data#25.time AS time#28L, data#25.end_time AS end_time#29L, data#25.srcip AS srcip#30, data#25.srcmac AS srcmac#31, data#25.srcport AS srcport#32, data#25.destip AS destip#33, data#25.destmac AS destmac#34, data#25.destport AS destport#35, data#25.proto AS proto#36, data#25.appproto AS appproto#37, data#25.upsize AS upsize#38L, data#25.downsize AS downsize#39L]
+- Project [jsontostructs(StructField(guid,StringType,true), StructField(time,LongType,true), StructField(end_time,LongType,true), StructField(srcip,StringType,true), StructField(srcmac,StringType,true), StructField(srcport,IntegerType,true), StructField(destip,StringType,true), StructField(destmac,StringType,true), StructField(destport,IntegerType,true), StructField(proto,StringType,true), StructField(appproto,StringType,true), StructField(upsize,LongType,true), StructField(downsize,LongType,true), cast(value#22 as string), Some(Asia/Shanghai), true) AS data#25]
   +- Project [cast(topic#9 as string) AS topic#21, cast(value#8 as string) AS value#22]
      +- StreamingExecutionRelation KafkaSource[Subscribe[traffic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2980)
    at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2978)
    at HBase.HBaseSink.addBatch(HBaseSinkProvider.scala:14)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more
sutugin commented 4 years ago

Hi @631068264, if you build from my fork, try specifying the format: "org.apache.spark.sql.execution.streaming.HBaseStreamSinkProvider" or "hbase"

Saimukunth commented 4 years ago

Hi,

I'm doing a structured spark streaming of the kafka ingested messages and storing the data in hbase post processing. The issue that is popping up is,

**ERROR ConnectionManager$HConnectionImplementation: The node /hbase is not in ZooKeeper. It should have been written by the master. Check the valu

e configured in 'zookeeper.znode.parent'. There could be a mismatch with the one configured in the master.**

I tried passing the hbase-site.xml in the spark-submit, but no luck. The hbase-site.xml has the property "zookeeper.znode.parent", which is "/hbase-unsecure"

My spark-submit parameters are, *spark-submit \ --class CountryCountStreaming \ --master yarn-client \ --conf spark.ui.port=4926 \ --jars $(echo /home/venkateshramanpc5546/external_jars/.jar | tr ' ' ',') \ --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 \ --repositories http://repo.hortonworks.com/content/groups/public/ \ --files /usr/hdp/current/hbase-client/conf/hbase-site.xml \ kafkasparkstreamingdemo_2.11-0.1.jar cloudxlab /tmp/venkatesh/retail_schema/Retail_Logs.json /tmp/venkatesh/retail_checkpoint**

The stack version in the cluster is given below:

Hadoop 2.7.3 HBase 1.1.2 Zookeeper 3.4.6 Kafka 0.10.1 Spark 2.1.1

Please find the build.sbt and the scala classes attached for your reference.

Kindly let me know if there is any hbase configuration (zookeeper quorum, zookeeper clientport, zookeeper znode parent) which we can set in the step where we are writing data to a table, which is,

df.write. options(Map(HBaseTableCatalog.tableCatalog -> hBaseCatalog, HBaseTableCatalog.newTable -> "4")). format(defaultFormat). save() StructuredStreaming.zip

sutugin commented 4 years ago

Hi, @Saimukunth! If you write in batch mode, not structured streaming, then the error is reproduced? I think the problem is the same as https://github.com/hortonworks-spark/shc/issues/150

Saimukunth commented 4 years ago

If I do it through legacy streaming, the error is not reproduced. I'm able to insert the data into HBase. Because, I create the HBase connection with the appropriate configuration.

Thanks, Venkatesh Raman

On Fri, 24 Apr 2020, 3:43 pm Andrey Sutugin, notifications@github.com wrote:

Hi, @Saimukunth https://github.com/Saimukunth! If you write in batch mode, not structured streaming, then the error is reproduced? I think the problem is the same as #150 https://github.com/hortonworks-spark/shc/issues/150

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/hortonworks-spark/shc/issues/205#issuecomment-618925832, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKNISGIY7HMPJYIGNN5BGBTROFQ3NANCNFSM4EHFJCIA .

sutugin commented 4 years ago

@Saimukunth Try to do as described here (https://github.com/hortonworks-spark/shc/issues/150#issuecomment-310871728) in the second case

Saimukunth commented 4 years ago

Hi,

Thanks, I was able to resolve the above issue using shc jar. Data is getting inserted into hbase, but not in a way I wanted.

**Expected:- 2018-08-27T01:57:00.000Z column=metrics:Algeria, timestamp=1535335088874, value=1 2018-08-27T01:57:00.000Z column=metrics:Brazil, timestamp=1535335089093, value=1 2018-08-27T01:57:00.000Z column=metrics:Canada, timestamp=1535335088664, value=1 2018-08-27T01:57:00.000Z column=metrics:China, timestamp=1535335089345, value=3 2018-08-27T01:57:00.000Z column=metrics:Czechia, timestamp=1535335088651, value=1 2018-08-27T01:57:00.000Z column=metrics:Hong Kong, timestamp=1535335089496, value=1

Actual:- 2020-05-06T10:52:00.000Z column=metrics:countryCount, timestamp=1588762512336, value=1 2020-05-06T10:52:00.000Z column=metrics:countryName, timestamp=1588762512336, value=United Kingdom 2020-05-06T10:53:00.000Z column=metrics:countryCount, timestamp=1588762508359, value=3 2020-05-06T10:53:00.000Z column=metrics:countryName, timestamp=1588762508359, value=Brazil**

This is my hbase catalog file, def catalog = s"""{ |"table":{"namespace":"default", "name":"country_count"}, |"rowkey":"window", |"columns":{ |"window":{"cf":"rowkey", "col":"window", "type":"string"}, |"countryName":{"cf":"metrics", "col":"countryName", "type":"string"}, |"countryCount":{"cf":"metrics", "col":"countryCount", "type":"int"} |} |}""".stripMargin val getCountryCountDF: DataFrame = spark.sql("select countryName, " + "cast(rounded_timestamp as string) as window, count(1) as countryCount from retail " + "group by countryName, cast(rounded_timestamp as string)"). selectExpr( "window", "countryName", "CAST(countryCount as STRING)") val finalDF = getCountryCountDF. writeStream. queryName("Retail Logs Writer"). format("HBase.HBaseStreamSinkProvider"). option("hbasecatalog", catalog). option("checkpointLocation", args(2)). outputMode("update"). trigger(Trigger.ProcessingTime("20 seconds")). start

In the HBaseStreamSinkProvider, I'm writing to hbase using,

**val schema = data.schema
val res: RDD[Row] = data.queryExecution.toRdd.mapPartitions { rows =>
  val converter = CatalystTypeConverters.createToScalaConverter(schema)
  rows.map(converter(_).asInstanceOf[Row])
}
val df = data.sparkSession.createDataFrame(res, schema)
df.write.
  options(Map(HBaseTableCatalog.tableCatalog -> hBaseCatalog,
    HBaseTableCatalog.newTable -> "4")).
  format(defaultFormat).
  save()**

Is there any way I can play around the HBaseTableCatalog class to get the desired result.

Thank and Regards, Venkatesh Raman