hortonworks-spark / shc

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
Apache License 2.0
552 stars 281 forks source link

AVRO schema record conversion fails with empty records: NoSuchElementException #142

Open troyjr opened 7 years ago

troyjr commented 7 years ago

I have defined an AVRO schema record as such (pyspark): schema_record = """{"namespace": "example.avro",

 "type": "record",      "name": "Fields",
     "fields": [      {"name": "codename", "type": "string"},
       {"name": "content_type",  "type": ["long", "null"]},
         {"name": "name", "type": ["string", "null"]}      ]    }"""

catalog = """{ "table":{"namespace":"default", "name":"htable4"}, "rowkey":"key1", "columns":{ "pk":{"cf":"rowkey", "col":"key1", "type":"long"}, "fields":{"cf":"cf1", "col":"fields", "avro":"schema_record"}, "model":{"cf":"cf1", "col":"model", "type":"string"} } }"""

For the fields struct, my source dataframe (MongoDB) has "null" records which are not present when loading the dataframe (they just get removed).

I would like to have these "missing" records honored by the AVRO schema conversion, and just treat them as nulls (source database is MongoDB).

This is to support agile development of MongoDB <-> HBase and schema evolution with AVRO.

When I attempt to load data with the missing records however, I am getting a NoSuchElementException.

Example code below (working):

data_source_format = 'org.apache.spark.sql.execution.datasources.hbase' examplejson=["""{ "fields": { "codename": "the codename", "content_type": 123421341234, "name": "the name" }, "model": "the model", "pk": 1234567890}"""] mydf=spark.read.json(sc.parallelize(examplejson)) mydf.printSchema() mydf.show()

schema_record = """{"namespace": "example.avro",

 "type": "record",      "name": "Fields",
     "fields": [      {"name": "codename", "type": "string"},
       {"name": "content_type",  "type": ["long", "null"]},
         {"name": "name", "type": ["string", "null"]}      ]    }"""

catalog = """{ "table":{"namespace":"default", "name":"htable4"}, "rowkey":"key1", "columns":{ "pk":{"cf":"rowkey", "col":"key1", "type":"long"}, "fields":{"cf":"cf1", "col":"fields", "avro":"schema_record"}, "model":{"cf":"cf1", "col":"model", "type":"string"} } }"""

mydf.write.options(schema_record=schema_record, newTable="5", catalog=catalog).format(data_source_format).save()

hbasedf = spark.read.options(schema_record=schema_record, catalog=catalog).format(data_source_format).load() hbasedf.printSchema() hbasedf.show()

Python code: Example code below (NOT working):

data_source_format = 'org.apache.spark.sql.execution.datasources.hbase' examplejson=["""{ "fields": { "codename": "the codename", "content_type": 123421341234 }, "model": "the model", "pk": 1234567890}"""] mydf=spark.read.json(sc.parallelize(examplejson)) mydf.printSchema() mydf.show()

schema_record = """{"namespace": "example.avro",

 "type": "record",      "name": "Fields",
     "fields": [      {"name": "codename", "type": "string"},
       {"name": "content_type",  "type": ["long", "null"]},
         {"name": "name", "type": ["string", "null"]}      ]    }"""

catalog = """{ "table":{"namespace":"default", "name":"htable4"}, "rowkey":"key1", "columns":{ "pk":{"cf":"rowkey", "col":"key1", "type":"long"}, "fields":{"cf":"cf1", "col":"fields", "avro":"schema_record"}, "model":{"cf":"cf1", "col":"model", "type":"string"} } }"""

mydf.write.options(schema_record=schema_record, newTable="5", catalog=catalog).format(data_source_format).save()

hbasedf = spark.read.options(schema_record=schema_record, catalog=catalog).format(data_source_format).load() hbasedf.printSchema() hbasedf.show()

The stack trace is: 17/06/07 01:13:32 WARN TaskSetManager: Lost task 3.0 in stage 9.0 (TID 22, ip-172-31-30-178.ec2.internal, executor 1): java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63) at org.apache.spark.sql.execution.datasources.hbase.types.SchemaConverters$$anonfun$createConverterToAvro$7.apply(Avro.scala:304) at org.apache.spark.sql.execution.datasources.hbase.types.Avro.toBytes(Avro.scala:56) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:221) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:217) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1(HBaseRelation.scala:217) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$insert$1$$anonfun$apply$4.apply(HBaseRelation.scala:229) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$insert$1$$anonfun$apply$4.apply(HBaseRelation.scala:229) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1124) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1123) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

I've taken a look into: https://github.com/hortonworks-spark/shc/blob/master/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/types/Avro.scala#L304

And it looks as though the line: record.put(fieldNamesIterator.next(), converter(rowIterator.next()))

Is not checking if the rowIterator has another line, eg it is not checking hasNext.

What would fix my problem is checking first if rowIterator.hasNext is true, and if it is not, simply put a null value, eg: record.put(fieldNamesIterator.next(), null)

I'm using version com.hortonworks:shc-core:1.1.1-2.1-s_2.11, Spark 2.1.1, HBase 1.3.0, Pyspark with Python version 2.7.12 and have also had the same issue with the spark-shell.

Command line for testing this within pyspark is: pyspark --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/

weiqingy commented 7 years ago

@troyjr Sorry for the late reply. SHC is a lib to support Spark accessing HBase tables, so no matter what languages you were using to write your application, it should work.

Based on the stack trace, the error was happened when saving data to Hbase. Could you also try the procedure here? In the step 3 Save DataFrame, it uses "Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))", please pay attention to the catalog definition. In this example, the catalog definitions for saving data are different with the catalog for loading data.

sa255304 commented 7 years ago

For the below security credentials manager as provided in the main page like below. spark.hbase.connector.security.credentials ambari-qa-c1@EXAMPLE.COM spark.hbase.connector.security.keytab /etc/security/keytabs/smokeuser.headless.keytab

Which Security keytab i need to Provide.

I have 3 types of security keytabs and principals.

One is hbase.headless.keytab 2nd one is spark.headless.keytab 3rd oneis normal keytab my project acount its like PROJECTNAME.keytab

I am facing this Authentication issue in old way of connecting to Hbase also.

sa255304 commented 7 years ago

And in Hortonworks blog for the Hbase Connector there is a different package name given to use.

pyspark –packages zhzhan:shc:0.0.11-1.6.1-s_2.10

pyspark --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories

In the above two what is correct Jar to be used.