hortonworks-spark / shc

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
Apache License 2.0
552 stars 281 forks source link

How to use avro tablecoder in pyspark for writing dataframe to hbase? #274

Closed vineelavelagapudi closed 6 years ago

vineelavelagapudi commented 6 years ago

df = spark.read.format("com.databricks.spark.avro").option("schema1",avroSchema).load("path_of_file")

avroSchema = {"namespace": "doctors.avro", "type": "record", "name": "user", "fields": [

    {"name":"first_name", "type":"string"},
    {"name":"last_name", "type":"string"},
    {"name":"number", "type":"int"}
]   }

hbaseschema3= { "table": {"namespace": "default","name": "trip_avro","tableCoder":"avro"}, "rowkey": "key1", "columns": { "col1": {"cf": "rowkey","col": "key1","type": "int"}, "col2": {"cf": "sales","col": "fields", "avro":"avroSchema"} } }

df.write.option("catalog",hbaseschema3).option("newtable","5").option("avroSchema",avroSchema).format("org.apache.spark.sql.execution.datasources.hbase").save()

Tried with above code but getting error as 'java.lang.ClassNotFoundException: avro ' by referring https://github.com/hortonworks-spark/shc/wiki/2.-Native-Avro-Support Can anyone give solution?

weiqingy commented 6 years ago

Hi @vineelavelagapudi
As the step 1 in https://github.com/hortonworks-spark/shc/wiki/2.-Native-Avro-Support shows, when you writing data, you should use "type":"binary".

vineelavelagapudi commented 6 years ago

Thank you @weiqingy for your response but still getting the same error. I tried in scala val schemaString = s"""{"namespace": "doctors.avro", "type": "record", "name": "user", "fields": [

    {"name":"first_name", "type":"string"},
    {"name":"last_name", "type":"string"},
    {"name":"number", "type":"int"}
]   }""".stripMargin

def catalog= s"""{ "table": {"namespace": "default","name": "trip_avro"}, "rowkey": "key1", "columns": { "col1": {"cf": "rowkey","col": "key1","type": "int"}, "col2": {"cf": "sales","col": "col2","type":"binary"}

    }

}""".stripMargin

def avroCatalog = s"""{ "table": {"namespace": "default","name": "trip_avro","tableCoder":"avro"}, "rowkey": "key1", "columns": { "col1": {"cf": "rowkey","col": "key1","type": "int"}, "col2": {"cf": "sales","col": "col2","avro":"avroSchema"}

    }

}""".stripMargin

import org.apache.avro.Schema

val avroSchema: Schema = { val p = new Schema.Parser p.parse(schemaString) }

import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

val df = spark.read.format("com.databricks.spark.avro").options(Map("avroSchema" -> schemaString,HBaseTableCatalog.tableCatalog -> catalog,HBaseTableCatalog.tableCatalog -> avroCatalog)).load("path-of-file")

df.write.options(Map("avroSchema" -> schemaString,HBaseTableCatalog.tableCatalog-> catalog, HBaseTableCatalog.tableCatalog-> avroCatalog,"newTable" -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()

java.lang.ClassNotFoundException: avro at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.sql.execution.datasources.hbase.types.SHCDataTypeFactory$.create(SHCDataType.scala:99) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog.(HBaseTableCatalog.scala:176) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:293) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) ... 48 elided