apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
812 stars 159 forks source link

Cast from timestamp to decimal causes an exception #1037

Open andygrove opened 1 week ago

andygrove commented 1 week ago

Describe the bug

SQL

SELECT c48, cast(c48 as DECIMAL(10,2)), try_cast(c48 as DECIMAL(10,2)) FROM test1 ORDER BY c48;

[ERROR] Query failed in Comet: Job aborted due to stage failure: Task 1 in stage 450.0 failed 4 times, most recent failure: Lost task 1.3 in stage 450.0 (TID 960) (10.0.0.118 executor 3): org.apache.comet.CometNativeException: InternalError: Native cast invoked for unsupported cast from Timestamp(Microsecond, Some("America/Denver")) to Decimal128(10, 2). at org.apache.comet.Native.executePlan(Native Method) at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:107) at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1$adapted(CometExecIterator.scala:106) at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157) at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:106) at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:118) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41) at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:322) at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:320) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:908) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:908) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace::

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 450.0 failed 4 times, most recent failure: Lost task 1.3 in stage 450.0 (TID 960) (10.0.0.118 executor 3): org.apache.comet.CometNativeException: InternalError: Native cast invoked for unsupported cast from Timestamp(Microsecond, Some("America/Denver")) to Decimal128(10, 2).
    at org.apache.comet.Native.executePlan(Native Method)
    at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:107)
    at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1$adapted(CometExecIterator.scala:106)
    at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
    at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:106)
    at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:118)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41)
    at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:322)
    at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:320)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:908)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:908)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
    at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:320)
    at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:187)
    at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec$.prepareJVMShuffleDependency(CometShuffleExchangeExec.scala:319)
    at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.shuffleDependency$lzycompute(CometShuffleExchangeExec.scala:159)
    at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.shuffleDependency(CometShuffleExchangeExec.scala:138)
    at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(CometShuffleExchangeExec.scala:112)
    at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.mapOutputStatisticsFuture(CometShuffleExchangeExec.scala:108)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
    at org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.submitShuffleJob(CometShuffleExchangeExec.scala:63)
    at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:181)
    at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:181)
    at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:183)
    at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:272)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:270)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:270)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:242)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:387)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:360)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3459)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4208)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4206)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3459)
    at org.apache.comet.fuzz.QueryRunner$.$anonfun$runQueries$3(QueryRunner.scala:70)
    at org.apache.comet.fuzz.QueryRunner$.$anonfun$runQueries$3$adapted(QueryRunner.scala:58)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.comet.fuzz.QueryRunner$.runQueries(QueryRunner.scala:58)
    at org.apache.comet.fuzz.Main$.main(Main.scala:77)
    at org.apache.comet.fuzz.Main.main(Main.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.comet.CometNativeException: InternalError: Native cast invoked for unsupported cast from Timestamp(Microsecond, Some("America/Denver")) to Decimal128(10, 2).
    at org.apache.comet.Native.executePlan(Native Method)
    at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:107)
    at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1$adapted(CometExecIterator.scala:106)
    at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
    at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:106)
    at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:118)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41)
    at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:322)
    at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:320)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:908)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:908)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

andygrove commented 1 week ago

This is a low priority edge case, because it is very unlikely that users would want to cast timestamp to decimal