hortonworks-spark / shc

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
Apache License 2.0
552 stars 281 forks source link

HBaseTableCatalog not found while spark2-submit #314

Open ttewary92 opened 5 years ago

ttewary92 commented 5 years ago

Hello,

I am currently facing certain challenges, when writing to HBase from Spark using shc jar.

Spark 2.1.0 Hbase on cluster 1.2.0

Spark submit statement: spark2-submit --master yarn --deploy-mode client --files /etc/hbase/conf/hbase-site.xml --class com.bde.datalake.test.Prepickup /home/datalake/tt/bde_datalake_2.11-1.0.jar --jars /home/datalake/tt/shc-core-1.1.0-2.1-s_2.11.jar --conf "spark.driver.extraClassPath=/home/datalake/tt/shc-core-1.1.0-2.1-s_2.11.jar" --packages com.hortonworks:shc:1.1.0-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/repositories/releases/

Following is my code: package com.bde.datalake.test

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog import org.apache.spark.storage._ import org.apache.hadoop.hbase.{ TableName, HBaseConfiguration } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.{ Result, Put }

object Prepickup {

val spark = SparkSession .builder() .appName("CT_Spark_parellel_test") .enableHiveSupport() .getOrCreate()

import spark.implicits._

spark.sql("set spark.dynamicAllocation.enabled = true") spark.sql("set spark.shuffle.service.enabled = true") spark.sql("set spark.dynamicAllocation.minExecutors = 10") spark.sql("set spark.dynamicAllocation.maxExecutors = 25") spark.sql("set spark.executor.cores = 5") spark.sql("set spark.driver.memory = 15g") spark.sql("set spark.executer.memory = 30g") spark.sql("set spark.driver.extraClassPath = /fae/conf/hadoop/hbase-site.xml")

def main(args : Array[String]) = {

val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "172.18.114.97")
config.set("hbase.zookeeper.property.clientPort", "2181")
config.set(TableInputFormat.INPUT_TABLE, "WFNSP.PBL_OPS_PREPICKUP")

/*config.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
config.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
* hbaseConfig.set(TableInputFormat.SCAN_COLUMNS*/

var columnFamily = "CAT"

val prepickupRDD = spark.sparkContext.newAPIHadoopRDD(config, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

val prepickupDF = prepickupRDD.map(x => (Bytes.toString(x._2.getRow()), 
    Bytes.toString(x._2.getValue(Bytes.toBytes("CAT"), Bytes.toBytes("COL1"))),
    Bytes.toString(x._2.getValue(Bytes.toBytes("CAT"), Bytes.toBytes("DRLDDT")))  ))
    .toDF()
    .withColumnRenamed("_1", "PK")
    .withColumnRenamed("_2", "COL1")
    .withColumnRenamed("_3", "DRLDDT")

prepickupDF.createOrReplaceTempView("prepickupDF")

val prepickupDF1 = spark.sql(s"""select PK, SPLIT(COL1,'~')[0] AS AWB, SPLIT(COL1,'~')[1] AS STTY, SPLIT(COL1,'~')[2] AS STCD, SPLIT(COL1,'~')[3] AS STD, SPLIT(COL1,'~')[4] AS STGR, SPLIT(COL1,'~')[5] AS PURT, SPLIT(COL1,'~')[6] AS PRCD, SPLIT(COL1,'~')[7] AS PRTY, SPLIT(COL1,'~')[8] AS SPRD, SPLIT(COL1,'~')[9] AS PKTY, SPLIT(COL1,'~')[10] AS CPRCD, SPLIT(COL1,'~')[11] AS PUDT, SPLIT(COL1,'~')[12] AS PUTM, SPLIT(COL1,'~')[13] AS STDT, SPLIT(COL1,'~')[14] AS STTM, SPLIT(COL1,'~')[15] AS CMARE from prepickupDF""")

printf(prepickupDF.count().toString())

def catalog1 =
  s"""{
    |"table":{"namespace":"default", "name":"PREPICKUP1"},
    |"rowkey":"PK",
    |"columns":{
      |"PK":{"cf":"rowkey", "col":"PK", "type":"string"},
    |"AWB":{"cf":"CAT", "col":"AWB", "type":"string"},
    |"STTY":{"cf":"CAT",    "col":"STTY", "type":"string"},
    |"STCD":{"cf":"CAT",    "col":"STCD", "type":"string"},
      |"STD":{"cf":"CAT",   "col":"STD", "type":"string"},
    |"STGR":{"cf":"CAT",    "col":"STGR", "type":"string"},
    |"PURT":{"cf":"CAT",    "col":"PURT", "type":"string"},
      |"PRCD":{"cf":"CAT",  "col":"PRCD", "type":"string"},
    |"PRTY":{"cf":"CAT",    "col":"PRTY", "type":"string"},
    |"SPRD":{"cf":"CAT",    "col":"SPRD", "type":"string"},
      |"PKTY":{"cf":"CAT",  "col":"PKTY", "type":"string"},
    |"CPRCD":{"cf":"CAT",   "col":"CPRCD", "type":"string"},
    |"PUDT":{"cf":"CAT",    "col":"PUDT", "type":"string"},
      |"PUTM":{"cf":"CAT",  "col":"PUTM", "type":"string"},
    |"STDT":{"cf":"CAT",    "col":"STDT", "type":"string"},
    |"STTM":{"cf":"CAT",    "col":"STTM", "type":"string"},
      |"CMARE":{"cf":"CAT", "col":"CMARE", "type":"string"}
    |}
    |}""".stripMargin

prepickupDF1.write .options(Map(HBaseTableCatalog.tableCatalog -> catalog1, HBaseTableCatalog.newTable -> "5")) .format("org.apache.spark.sql.execution.datasources.hbase") .save()
} }

build.sbt

name := "bde_datalake"

version := "1.0"

scalaVersion in ThisBuild := "2.11.8"

val sparkVersion = "2.1.0"

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.1"

dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.6.7.1"

unmanagedJars in Compile += file("lib/shc-core-1.1.0-2.1-s_2.11.jar")

resolvers += "SparkPackages" at "https://dl.bintray.com/spark-packages/maven/" resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases")) resolvers += "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.1.0", "org.apache.spark" %% "spark-streaming" % "2.1.0",

"org.apache.kafka" % "kafka-clients" % "0.8.2.0", "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0", "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-hive" % "2.1.0", "org.apache.hadoop" % "hadoop-common" % "2.6.0",

//"org.apache.hbase" % "hbase-spark" % "1.2.0-cdh5.13.1", "org.apache.hbase" % "hbase-common" % "1.1.2", "org.apache.hbase" % "hbase-client" % "1.1.2", "org.apache.hbase" % "hbase-server" % "1.1.2", "org.scala-lang" % "scala-library" % "2.11.8", "org.scala-lang" % "scala-reflect" % "2.11.8", "edu.stanford.nlp" % "stanford-corenlp" % "3.7.0" , "edu.stanford.nlp" % "stanford-corenlp" % "3.7.0" % "test" classifier "models", "databricks" % "spark-corenlp" % "0.2.0-s_2.11"
)

resolvers ++= Seq( "Typesafe" at "http://repo.hortonworks.com/content/repositories/releases/", "Java.net Maven2 Repository" at "http://download.java.net/maven/2/" )

fork in run := true

I get the following error while running the code: 225926Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog$ at com.bde.datalake.test.Prepickup$.main(Prepickup.scala:135) at com.bde.datalake.test.Prepickup.main(Prepickup.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$ at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 12 more

I have added the shc jar, packages and repository to spark submit