pingcap / tikv-client-lib-java

TiKV Java client library
Apache License 2.0
44 stars 21 forks source link

Time type was not implemented in decode logic #123

Open birdstorm opened 6 years ago

birdstorm commented 6 years ago

Time type has DURATION_FLAG, but when decoding the flag was not used and it results in decode error of the following columns. Should implement a new duration class to use Time type correctly.

zhexuany commented 6 years ago

The context is: creating schema using the following:

create table t2(c1 timestamp not null);
create table t3(c1 time not null, c2 timestamp not null);
insert into t2 values(now());
insert into t3 values(time(now()), now());

TiDB has the following behavior:

mysql> select * from t2;
+---------------------+
| c1                  |
+---------------------+
| 2017-10-27 00:00:41 |
+---------------------+
1 row in set (0.00 sec)

As we can see, duration works perfect fine. So does the TiSpark side.

spark.sql("select * from t2").show;
+--------------------+
|                  c1|
+--------------------+
|2017-10-27 00:00:...|
+--------------------+

While spark.sql("select * from t3").show; throws a lot exception:

com.pingcap.tikv.codec.InvalidCodecFormatException: Invalid Flag type for TimestampType: 8
    at com.pingcap.tikv.types.TimestampType.decodeNotNull(TimestampType.java:61)
    at com.pingcap.tikv.types.DataType.decodeValueToRow(DataType.java:124)
    at com.pingcap.tikv.row.DefaultRowReader.readRow(DefaultRowReader.java:38)
    at com.pingcap.tikv.operation.SelectIterator.next(SelectIterator.java:131)
    at com.pingcap.tikv.operation.SelectIterator.next(SelectIterator.java:42)
    at com.pingcap.tispark.TiRDD$$anon$1.next(TiRDD.scala:70)
    at com.pingcap.tispark.TiRDD$$anon$1.next(TiRDD.scala:50)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
ilovesoup commented 6 years ago

Here has two part of problem.

  1. decoding should be correct.
  2. encoding to coprocessor the same. It happens when pushdown predicates with time/duration type to coprocessor. please make sure both works.