housepower / ClickHouse-Native-JDBC

ClickHouse Native Protocol JDBC implementation
https://housepower.github.io/ClickHouse-Native-JDBC/
Apache License 2.0
523 stars 145 forks source link

java.lang.ClassCastException: [B cannot be cast to [Ljava.lang.Object; #464

Closed maxim-lixakov closed 1 week ago

maxim-lixakov commented 1 month ago

Environment

I found that if I implement custom mapping in getCatalystType instead ofInt32 to ShortType but to StringType, it would successfully be read into string, however with ShortType I face this problem, is there any way to fix it to keep its data structure?

Error logs

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.69 executor driver): java.lang.ClassCastException: [B cannot be cast to [Ljava.lang.Object;
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$29(JdbcUtils.scala:544)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$31(JdbcUtils.scala:550)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:560)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$30(JdbcUtils.scala:550)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$30$adapted(JdbcUtils.scala:547)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:340)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:27)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.69 executor driver): java.lang.ClassCastException: [B cannot be cast to [Ljava.lang.Object;
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$29(JdbcUtils.scala:544)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$31(JdbcUtils.scala:550)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:560)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$30(JdbcUtils.scala:550)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$30$adapted(JdbcUtils.scala:547)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:340)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:27)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3573)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3573)
    at ru.mts.doetl.sparkdialectextensions.ClickhouseDialectTest.$anonfun$new$13(ClickhouseDialectTest.scala:190)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
    at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
    at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
    at ru.mts.doetl.sparkdialectextensions.ClickhouseDialectTest.org$scalatest$BeforeAndAfterEach$$super$runTest(ClickhouseDialectTest.scala:12)
    at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
    at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
    at ru.mts.doetl.sparkdialectextensions.ClickhouseDialectTest.runTest(ClickhouseDialectTest.scala:12)
    at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
    at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
    at org.scalatest.Suite.run(Suite.scala:1114)
    at org.scalatest.Suite.run$(Suite.scala:1096)
    at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
    at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
    at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
    at ru.mts.doetl.sparkdialectextensions.ClickhouseDialectTest.org$scalatest$BeforeAndAfterAll$$super$run(ClickhouseDialectTest.scala:12)
    at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    at ru.mts.doetl.sparkdialectextensions.ClickhouseDialectTest.run(ClickhouseDialectTest.scala:12)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
    at org.scalatest.tools.Runner$.run(Runner.scala:798)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
Caused by: java.lang.ClassCastException: [B cannot be cast to [Ljava.lang.Object;

Steps to reproduce

I have table in clickhouse with next DDL (you can manually create it):

create table if not exists default.`6XW2L4fomO`
(
    arrayColumn Array(Int32)
)
    engine = MergeTree ORDER BY tuple()
        SETTINGS index_granularity = 8192;

you may insert some data:

INSERT INTO $tableName 6XW2L4fomO ([255, 0, 128, 64, 32])

now I want to read it via spark:

val df = spark.read
  .format("jdbc")
  .option("url", jdbcUrl)
  .option("dbtable", tableName)
  .load()

assert(df.schema.fields.head.dataType === ArrayType(ShortType, containsNull = false))
// true

// causes error:
val data = df.collect().map(_.getAs[Seq[String]]("uintArrayColumn")).head

Other descriptions