olavloite / spanner-jdbc

JDBC Driver for Google Cloud Spanner
MIT License
38 stars 10 forks source link

issue on Spark: java.sql.SQLException: rows cannot be less than 1 #121

Closed sgireddy closed 5 years ago

sgireddy commented 5 years ago

Hi @olavloite, Thank you for implementing the open source driver for spanner. I followed the example here: https://stackoverflow.com/questions/44824968/simba-jdbc-driver-for-cloud-spanner-used-with-spark-jdbc-dataframe-reader?rq=1

The driver works perfectly for direct JDBC queries but not with spark. I got the following exception which I couldn't repro with Simba JDBC driver.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.sql.SQLException: rows cannot be less than 1 at nl.topicus.jdbc.AbstractCloudSpannerFetcher.setFetchSize(AbstractCloudSpannerFetcher.java:57) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:300) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at org.apache.spark.sql.Dataset.head(Dataset.scala:2484) at org.apache.spark.sql.Dataset.take(Dataset.scala:2698) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at org.apache.spark.sql.Dataset.show(Dataset.scala:723) at org.apache.spark.sql.Dataset.show(Dataset.scala:682) at org.apache.spark.sql.Dataset.show(Dataset.scala:691) at com.srilabs.teststuff.SparkJdbcTest$.delayedEndpoint$com$srilabs$teststuff$SparkJdbcTest$1(SparkJdbcTest.scala:64) at com.srilabs.teststuff.SparkJdbcTest$delayedInit$body.apply(SparkJdbcTest.scala:7) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at com.srilabs.teststuff.SparkJdbcTest$.main(SparkJdbcTest.scala:7) at com.srilabs.teststuff.SparkJdbcTest.main(SparkJdbcTest.scala) Caused by: java.sql.SQLException: rows cannot be less than 1 at nl.topicus.jdbc.AbstractCloudSpannerFetcher.setFetchSize(AbstractCloudSpannerFetcher.java:57) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:300) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

olavloite commented 5 years ago

Fixed for version 1.1.3. The value isn't used by the driver, but is needed in order to comply with the interface definition.

olavloite commented 5 years ago

Version 1.1.3 has been released.

sgireddy commented 5 years ago

Works great! Thanks @olavloite