Closed tenstriker closed 7 years ago
Could you please elaborate your problem? Which SHC version are you using? If possible, could you share some of your code? Thanks.
I am reading hbase table and convert it to dataframe using catalog.
def catalog = s"""{ |"table":{"namespace":"/tables/", "name":"/account"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"amount":{"cf":"cf", "col":"amount", "type":"double"}, |"mgramount":{"cf":"cf", "col":"itemcount", "type":"bigint"} |} |}""".stripMargin
I have above schema which I think causes the issue. If I change all datatypes to "string" it works.
def withCatalog(cat: String) = { sparkSession .read .options(Map(HBaseTableCatalog.tableCatalog -> cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load }
val accDf = withCatalog(catalog) accDf.take(10).foreach(println) //Fails on this action
Could be because of empty column values? which is failed to be serialized as double or int.
"double", "bigint" and "String" are all supported by SHC. The error shows that SHC failed to convert a byte array whose size is 4 to a double since Bytes.toDouble(src, 0)
requires the size of src
is >= 8
. I do not have the environment (i.e your data source) to debug the code, but I guess some values of "amount" might not be double.
value of amount can be empty in datasource or it can be like 1234 without any decimal points.
e.g.
272505447 column=cf:amount, timestamp=1482882446784, value=2627
272505448 column=cf:amount, timestamp=1482882446784, value=
How did you save the data into Hbase?
By some mapreduce jobs. can you confirm if that's the cause of the issue? I'll just use string as a datatype then to avoid such issue due to data.
If you used SHC for both write and read, then empty column values
are probably not the root cause. Since you just use SHC to read the table which were saved by other tools, that might be the cause, but I can not confirm that. You should make sure the mechanism of data encode match with the mechanism of data decode. For example, when you use mapreduce jobs to write data into HBase, the mapreduce jobs convert the data from java object to HBase byte array, you need to make sure SHC able to understand that HBase byte array correctly when you use SHC to read the byte array from HBase. We suggest, if possible, you can SHC for both read and write.
Actually most columns are java Long converted to String before writing it to hbase.
I have the same case where my value in the HBase cell is in Negative i.e. -1 and it is throwing me the same error.
Hi, I am getting into same issue. Any solution or workaround. Unfortunately, we don't have control on how data gets into HBase. Appreciate any help here.
Hi, I came across a similar situation, see post on Stackoverflow for details on how to recreate and how to workaround.
Hi,
I'm also facing the same issue. I've used shc to save the data into hbase. When reading from hbase, I tried debugging and the issue is coming up only on the columns with double values (positive and negative values). I'm able to iterate over the rest of the columns using show(). I checked the hbase data of the issue column, and it only has double values, nothing else. I'm using shc version "com.hortonworks" % "shc-core" % "1.1.1-2.1-s_2.11" .
I've given a sample of the source code and hbase data below,
HBase Write,
**def customer_catalog = s"""{ |"table":{"namespace":"default", "name":"card_customer"}, |"rowkey":"cc_num", |"columns":{ |"cc_num":{"cf":"rowkey", "col":"cc_num", "type":"long"}, |"first":{"cf":"cust", "col":"first", "type":"string"}, |"last":{"cf":"cust", "col":"last", "type":"string"}, |"gender":{"cf":"cust", "col":"gender", "type":"string"}, |"street":{"cf":"cust", "col":"street", "type":"string"}, |"city":{"cf":"cust", "col":"city", "type":"string"}, |"state":{"cf":"cust", "col":"state", "type":"string"}, |"zip":{"cf":"cust", "col":"zip", "type":"int"}, |"lat":{"cf":"cust", "col":"lat", "type":"double"}, |"long":{"cf":"cust", "col":"long", "type":"double"}, |"job":{"cf":"cust", "col":"job", "type":"string"}, |"dob":{"cf":"cust", "col":"dob", "type":"string"} |} |}""".stripMargin
val customers_df = spark.read.format("csv").option("inferSchema", true).option("header", true).load(args(0))
customers_df.write.options(Map(HBaseTableCatalog.tableCatalog -> customer_catalog,HBaseTableCatalog.newTable -> "4")).format(defaultFormat).save()**
HBase Read,
**val customer_df = withCatalog(customer_catalog)
customer_df.printSchema() root |-- cc_num: long (nullable = true) |-- first: string (nullable = true) |-- last: string (nullable = true) |-- gender: string (nullable = true) |-- street: string (nullable = true) |-- city: string (nullable = true) |-- state: string (nullable = true) |-- zip: integer (nullable = true) |-- lat: double (nullable = true) |-- long: double (nullable = true) |-- job: string (nullable = true) |-- dob: string (nullable = true)
HBase Customer 'lat' column complete data,
column=cust:lat, timestamp=1589126119532, value=38.104
column=cust:lat, timestamp=1589126119532, value=39.0204
column=cust:lat, timestamp=1589126119532, value=42.7382
column=cust:lat, timestamp=1589126119532, value=43.6221
column=cust:lat, timestamp=1589126119532, value=43.0934
column=cust:lat, timestamp=1589126119532, value=42.291
column=cust:lat, timestamp=1589126119532, value=28.8652
column=cust:lat, timestamp=1589126119532, value=40.2841
column=cust:lat, timestamp=1589126119532, value=39.471
column=cust:lat, timestamp=1589126119532, value=35.0552
column=cust:lat, timestamp=1589126119532, value=32.4593
column=cust:lat, timestamp=1589126119532, value=37.4527
column=cust:lat, timestamp=1589126119532, value=38.1919
column=cust:lat, timestamp=1589126119532, value=40.9252
column=cust:lat, timestamp=1589126119532, value=41.6849
column=cust:lat, timestamp=1589126119532, value=41.8798
column=cust:lat, timestamp=1589126119532, value=37.8344
column=cust:lat, timestamp=1589126119532, value=41.0541
column=cust:lat, timestamp=1589126119532, value=39.9001
column=cust:lat, timestamp=1589126119532, value=40.6174
column=cust:lat, timestamp=1589126119532, value=40.0685
column=cust:lat, timestamp=1589126119532, value=41.8858
column=cust:lat, timestamp=1589126119532, value=40.9419
column=cust:lat, timestamp=1589126119532, value=45.9344
column=cust:lat, timestamp=1589126119532, value=41.9399
column=cust:lat, timestamp=1589126119532, value=47.2689
column=cust:lat, timestamp=1589126119532, value=41.6838
column=cust:lat, timestamp=1589126119532, value=40.1765
column=cust:lat, timestamp=1589126119532, value=36.9576
column=cust:lat, timestamp=1589126119532, value=41.5583
column=cust:lat, timestamp=1589126119532, value=42.1265
column=cust:lat, timestamp=1589126119532, value=42.1588
column=cust:lat, timestamp=1589126119532, value=34.8825
column=cust:lat, timestamp=1589126119532, value=38.6203
column=cust:lat, timestamp=1589126119532, value=41.7461
column=cust:lat, timestamp=1589126119532, value=34.156
column=cust:lat, timestamp=1589126119532, value=35.3039
column=cust:lat, timestamp=1589126119532, value=27.4295
column=cust:lat, timestamp=1589126119532, value=33.8011
column=cust:lat, timestamp=1589126119532, value=42.8135
column=cust:lat, timestamp=1589126119532, value=40.1879
column=cust:lat, timestamp=1589126119532, value=30.7766
column=cust:lat, timestamp=1589126119532, value=28.5163
column=cust:lat, timestamp=1589126119532, value=37.7917
column=cust:lat, timestamp=1589126119532, value=39.2502
column=cust:lat, timestamp=1589126119532, value=36.6876
column=cust:lat, timestamp=1589126119532, value=40.6321
column=cust:lat, timestamp=1589126119532, value=44.769
column=cust:lat, timestamp=1589126119532, value=44.9039
column=cust:lat, timestamp=1589126119532, value=44.9715**
No issue when,
**customer_df.select("first","last","gender","street","city","state","zip","job","dob").show()
+---------+---------+------+--------------------+--------------+-----+---------+--------------------+-------------------+ | first| last|gender| street| city|state| zip| job| dob| +---------+---------+------+--------------------+--------------+-----+---------+--------------------+-------------------+ | Melissa| James| F| 537 Bryant Mall| Salt Lick| KY|875574071|Psychologist, for...|1956-07-19 14:30:00| | John| Holland| M|630 Christina Harbor| Zephyr Cove| NV|943273012|Geophysical data ...|1949-12-28 13:30:00| | James|Rodriguez| M| 95514 Andrew Street| Elk Point| SD|892809266| Chiropractor|1953-07-28 14:30:00| | Maurice| Simon| M|031 Jessica Harbo...| Caledonia| MN|892680498|Hydrographic surv...|1974-11-03 13:30:00| | Kevin| Martin| M|40514 Diana Expre...| Savannah| NY|825438516|Scientist, physio...|1973-07-07 14:30:00| | Debra| Davis| F| 566 Reed Well| Canton| MI|876097848|Teacher, special ...|1998-03-01 13:30:00| | John| Brown| M|79481 Potter Vill...| Francitas| TX|926366006|Engineer, civil (...|1973-08-13 14:30:00| | Monica| Brown| F| 39422 Chloe Court| Myers Flat| CA|959788341|Designer, fashion...|1948-01-13 13:30:00|**
Issue when adding lat or long column,
**customer_df.select("first","last","gender","street","city","state","zip","job","dob","lat").show()
java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 6 at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:631) at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:605) at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:729) at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:720) at org.apache.spark.sql.execution.datasources.hbase.types.PrimitiveType.fromBytes(PrimitiveType.scala:33) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:107) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:99) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.buildRow(HBaseTableScan.scala:99) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:189) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:170) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)**
Kindly suggest if anything is missed or wrong.
Thanks, Venkatesh Raman
I am also facing the same issue
at org.apache.hadoop.hbase.mapreduce.TsvImporterMapper.map(TsvImporterMapper.java:200) at org.apache.hadoop.hbase.mapreduce.TsvImporterMapper.map(TsvImporterMapper.java:46) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168) Caused by: java.lang.IllegalArgumentException: Wrong length: 10, expected 8 at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:839) at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:813) at org.apache.hadoop.hbase.mapreduce.ImportTsv$TsvParser$ParsedLine.getCellTTL(ImportTsv.java:393) at org.apache.hadoop.hbase.mapreduce.TsvImporterMapper.map(TsvImporterMapper.java:164) ... 9 more
Any workaround for this?
Thanks Shankar
I am getting following error when using bigint, long or double datatypes. It runs if I use string. Also document says it supports Java primitive types but the examples have bigint, tinyint, smallint which are not java types.
Caused by: java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 4 at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:631) at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:605) at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:729) at org.apache.spark.sql.execution.datasources.hbase.Utils$.hbaseFieldToScalaType(Utils.scala:51) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:123) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:114) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.buildRow(HBaseTableScan.scala:114) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:205) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:186) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)