hortonworks-spark / shc

The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
Apache License 2.0
552 stars 280 forks source link

Query with Where clause and In Operator returns wrong count #143

Open PrabhuJoseph opened 7 years ago

PrabhuJoseph commented 7 years ago

Query with Where clause and in operator returns wrong count. The issue does not present in shc:1.0.0-1.6-s_2.10 but the latest 1.1.1-1.6-s_2.10 has the issue. Below is the repro

Hbase Table and repro data:

hbase(main):005:0> create 'SHC','colfam1' 0 row(s) in 5.8380 seconds

=> Hbase::Table - SHC hbase(main):006:0> put 'SHC','ro1','colfam1:col1','val1' 0 row(s) in 0.6730 seconds hbase(main):021:0> put 'SHC','ro1','colfam1:col2','val2' 0 row(s) in 0.0230 seconds hbase(main):022:0> put 'SHC','ro2','colfam1:col1','val1' 0 row(s) in 0.0220 seconds hbase(main):024:0> put 'SHC','ro2','colfam1:col2','val2' 0 row(s) in 0.0210 seconds hbase(main):026:0> put 'SHC','ro3','colfam1:col1','xxx' 0 row(s) in 0.0550 seconds hbase(main):029:0> put 'SHC','ro3','colfam1:col2','xxx' 0 row(s) in 0.0250 seconds

hbase(main):030:0>scan 'SHC' ROW COLUMN+CELL ro1 column=colfam1:col1, timestamp=1496832633772, value=val1 ro1 column=colfam1:col2, timestamp=1496832640358, value=val2 ro2 column=colfam1:col1, timestamp=1496832653773, value=val1 ro2 column=colfam1:col2, timestamp=1496832663550, value=val2 ro3 column=colfam1:col1, timestamp=1496832677218, value=xxx ro3 column=colfam1:col2, timestamp=1496832689356, value=xxx 3 row(s) in 0.0780 seconds

hbase(main):032:0> exit

./spark-shell --packages com.hortonworks:shc-core:1.1.1-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/

import org.apache.spark.sql.execution.datasources.hbase import org.apache.spark.sql.execution.datasources.hbase._

var cat = """{"table":{"namespace":"default", "name":"SHC","tableCoder":"PrimitiveType"},"rowkey":"key","columns":{"key":{"cf":"rowkey", "col":"key", "type":"string"},"id":{"cf":"colfam1","col":"col1","type":"string"}}}"""

val hbdf = sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog -> cat)).format("org.apache.spark.sql.execution.datasources.hbase").load()

hbdf.registerTempTable("htb")

sql("select count(1) from htb").show 3

sql("select count(1) from htb where id in ('val1','xxx')").show 6

The above is suppose to return 3 but it returned 6.

weiqingy commented 7 years ago

Thanks @PrabhuJoseph.

I just reproduced this issue. (1) I ran spark-shell with com.hortonworks:shc-core:1.1.1-1.6-s_2.10:

[ambari-qa@c6401 ~]$ spark-shell --packages com.hortonworks:shc-core:1.1.1-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/

Some commands and results:

scala> hbdf.show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

scala> hbdf.registerTempTable("htb")

scala> sql("select * from htb").show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

scala> sql("select * from htb where id in ('val1','xxx')").show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

scala> sql("select * from htb where (id == 'val1' or id == 'xxx')").show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

(2) I ran spark-shell with com.hortonworks:shc:1.0.0-1.6-s_2.10:

[ambari-qa@c6401 ~]$ spark-shell --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/

Some commands and results:

scala> hbdf.show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

scala> hbdf.registerTempTable("htb")

scala> sql("select * from htb").show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

scala> sql("select * from htb where id in ('val1','xxx')").show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

scala> sql("select * from htb where (id == 'val1' or id == 'xxx')").show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

By comparing with (1) and (2), we can find that Spark has read data from HBase table SHC into Spark DataFrame hbdf correctly. If you use where (id == 'val1' or id == 'xxx') instead of where id in ('val1','xxx'), the results will be correct. So I think this issue may come from Spark itself or from the implementation of unhandledFilters() in SHC. I'll debug more and update the root cause soon. Let's know if you have any update. Thanks.

P.S. In pom file, for com.hortonworks:shc-core:1.1.1-1.6-s_2.10, spark version is 1.6.3; for shc:1.0.0-1.6-s_2.10, spark version is 1.6.1.

weiqingy commented 7 years ago

It seems this issue was not from unhandledFilters() in SHC. After removing all the implementation of unhandledFilters() from SHC, the number of records returned from the following operation is still 6:

scala> sql("select * from htb where id in ('val1','xxx')").show
+---+----+
|key|  id|
+---+----+
|ro1|val1|
|ro2|val1|
|ro3| xxx|
|ro1|val1|
|ro2|val1|
|ro3| xxx|
+---+----+

Will debug more to confirm if this issue is from Spark itself.

heatherandthewho commented 7 years ago

So i just ran into this with the "IN" operator, and even though i switched to syntax like "where (id == 'val1' or id == 'xxx')" the count is still wrong... when you use "df.show" the result is fine (shows 3 items) but the "df.count" is wrong. Very odd.

Update: do df.collectAsList().size() or df.rdd().count()and the count is right, 3.

LeiRui commented 5 years ago

I encounter the same bug. df.show() returns 4 rows while df.count() is 9.