hortonworks-spark / shc

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
Apache License 2.0
552 stars 280 forks source link

How to write a Dataframe to hbase table through Java using SHC? #282

Open rashid-1989 opened 6 years ago

rashid-1989 commented 6 years ago

Hi, I am using below code to write a Dataframe to hbase table:

import java.util.HashMap; import java.util.Map;

import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog;

public class connTest {

static String hbaseCatalog = "{\r\n"
        + "\"table\":{\"namespace\":\"NewSample\", \"name\":\"NewSample:temp\", \"tableCoder\":\"PrimitiveType\"},\r\n"
        + "\"rowkey\":\"id1\",\r\n" + "\"columns\":{\r\n"
        + "\"ID1\":{\"cf\":\"rowkey\", \"col\":\"id1\", \"type\":\"string\"},\r\n"
        + "\"DEPARTMENT\":{\"cf\":\"general\", \"col\":\"department\", \"type\":\"string\"},\r\n"
        + "\"EMAIL_COUNT\":{\"cf\":\"general\", \"col\":\"emailCount\", \"type\":\"string\"},\r\n"
        + "\"REGION\":{\"cf\":\"general\", \"col\":\"region\", \"type\":\"string\"}\r\n" + "}\r\n" + "}";

private static Dataset<org.apache.spark.sql.Row> withCatalog(String catalog) {      
    SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
            .config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
            .config("hbase.client.retries.number", "2")
            .getOrCreate();     
    SQLContext sqlContext = new SQLContext(spark);
    Map<String, String> map = new HashMap();
    map.put(HBaseTableCatalog.tableCatalog(), catalog);

    Dataset<org.apache.spark.sql.Row> df = sqlContext.read().options(map)
            .format("org.apache.spark.sql.execution.datasources.hbase").load();
    df.show();
    return df;
    // df.show();       

String newHbaseCatalog = "{\r\n"

And getting the below exception:

Exception in thread "main" java.util.NoSuchElementException: key not found: catalog at scala.collection.MapLike$class.default(MapLike.scala:228) at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.default(CaseInsensitiveMap.scala:28) at scala.collection.MapLike$class.apply(MapLike.scala:141) at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.apply(CaseInsensitiveMap.scala:28) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:185) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:162) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:57) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:64) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:61) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:84) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

rashid-1989 commented 6 years ago

@weiqingy Could you please suggest if i am missing something here?

louisliu318 commented 6 years ago

I came across the same issue.

rashid-1989 commented 6 years ago

Could you resolve it?

louisliu318 commented 6 years ago

can you attach your latest code?

rashid-1989 commented 6 years ago

Please refer the about snippet followed by the exception. Thats the code I am trying to execute.

rashid-1989 commented 6 years ago

@louisliu318 do you need any additional details on the above?

rashid-1989 commented 6 years ago

I am stuck here. Can someone please suggest?

HulkSun commented 5 years ago

Same error on scala.

        at scala.None$.get(Option.scala:347)
        at scala.None$.get(Option.scala:345)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:277)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:60)
        at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(DefaultSource.scala:24)
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
        at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource$.main(HBaseSource.scala:107)
        at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource.main(HBaseSource.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:744)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
TEray commented 5 years ago

Same error!

TEray commented 5 years ago

Map<String, String> putMap = new HashMap(); map.put(HBaseTableCatalog.tableCatalog(), newHbaseCatalog); map.put(HBaseTableCatalog.newTable(), "1");

df.write().options(putMap) .format("org.apache.spark.sql.execution.datasources.hbase") .save();

you shoulde change putMap to map @rashid-1989