ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
188 stars 66 forks source link

Support writing with functions in distribute/partition expressions #253

Open Yxang opened 1 year ago

Yxang commented 1 year ago

What does this PR do?

This PR introduces support for writing to ClickHouse tables with functions in distribute/partition expressions after Spark 3.4's merge of SPARK-39607, with the need to implement ClickHouse functions into corresponding Spark ones.

The implemented functions in this PR include:

Also existing v2 function for xxHash64 is available.

Corresponding tests are also included.

Limitations

Main limitation is Spark function cannot fully match Clickhouse function's behavior, including:

Yxang commented 1 year ago

I want to provide more information on the last limitation that time functions support only one input type. I tried but failed to use auto type casting via UnboundFunction.bind(StructType). It is not working when that function it is used in requiredDistribution in RequiresDistributionAndOrdering. The problem can be reproduced code by

  1. uncomment the line of xenon/clickhouse/func/clickhouse/Months.scala:38:case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this
  2. Build the dev docker environment. Need to set spark version to 3.4.0, and maybe manually compile iceberg-spark-runtime and add the jar into spark package
  3. Run SQL in clickhouse
    create database if not exists test on cluster single_replica;
    create or replace table test.test_month_shard on cluster single_replica
    (
    create_time timestamp,
    create_date date,
    value       String
    )
    engine = MergeTree()
        order by create_time;
    create or replace table test.test_month on cluster single_replica
    as test.test_month_shard
        engine =Distributed(single_replica, 'test', 'test_month_shard', toYYYYMM(create_date));
  4. Run SQL in spark. It will fail with org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    insert into clickhouse_s1r1.test.test_month
    values
    (timestamp'2021-01-01 10:10:10', timestamp'2021-01-01 10:10:10', '1'),
    (timestamp'2022-02-02 10:10:10', timestamp'2022-02-02 10:10:10', '2'),
    (timestamp'2023-03-03 10:10:10', timestamp'2023-03-03 10:10:10', '3'),
    (timestamp'2024-04-04 10:10:10', timestamp'2024-04-04 10:10:10', '4') AS tab(create_time, create_date, value)
  5. If we create Clickhouse distribute table as toYYYYMM(create_time), and insert timestamp type in Spark, it will fail with the same message as 4
  6. Directly run UDF will success
    use clickhouse_s1r1.test;
    select clickhouse_months(timestamp'2024-04-04 10:10:10')

So it seems that this functionality is broken with requiredDistribution in RequiresDistributionAndOrdering. I suppose this is a problem on Spark's side?

I believe adding a type cast before calling UDF is always helpful because we do not know the inserted data type in advance. However, this is also not achievable because requiredDistribution does not handle casting, though it can be modified just a little to support this.

pan3793 commented 1 year ago

@Yxang awesome! will take a look soon.

pan3793 commented 1 year ago

Some educations I learned from my last time investigation in this area: the hash result depends on two things 1) algorithm, 2) memory layout of input data.

UTF8 has a consistent memory layout, but integers are not, it mostly depends on hardware(sometimes maybe related to the OS platform and virtual machine, e.g. JVM), even a single ClickHouse cluster can not work properly on machines composed of different arch CPUs.

pan3793 commented 1 year ago

When input type mismatches with the required type, error message org.apache.spark.sql.AnalysisException: some_function(arg) is not currently supported is ambiguous and misleading

It sounds like a Spark side issue, thanks for providing detailed reproduce steps and your analysis, let me investigate it a bit.

Yxang commented 1 year ago

When input type mismatches with the required type, error message org.apache.spark.sql.AnalysisException: some_function(arg) is not currently supported is ambiguous and misleading

It sounds like a Spark side issue, thanks for providing detailed reproduce steps and your analysis, let me investigate it a bit.

Thanks but I want to clarify, ambiguous exceptions and auto-casting not working are two different issues (limitation no. 3 and 4), and I am addressing the auto-casting issue in the reproducing steps.

The current behavior is when the input type is not supported, job will fail fast with some_function(arg) is is not currently supported. When I want to rely on auto-casting, following the steps with code modification, it will fail as described (slow fail on the actual writing stage).

pan3793 commented 1 year ago

BTW, Spark 3.4.1 was just released these days, do you mind sending another PR to upgrade it? Once you get a PR merged, your following PR will not be blocked by "workflows awaiting approval" to run CI.

pan3793 commented 1 year ago
  1. Build the dev docker environment. Need to set spark version to 3.4.0, and maybe manually compile iceberg-spark-runtime and add the jar into spark package

Iceberg 1.3.0 was released with Spark 3.4 support, it can be upgraded to Spark 3.4.1 and Iceberg 1.3.0 in master, PR is welcome.

Yxang commented 1 year ago

BTW, Spark 3.4.1 was just released these days, do you mind sending another PR to upgrade it? Once you get a PR merged, your following PR will not be blocked by "workflows awaiting approval" to run CI.

Sure thing

pan3793 commented 1 year ago

With the full stacktrace, I think I know what happened on the Spark side.

The Cast was applied in codegen mode but failed due to lack of zoneId, and seems the interpreter mode has another issue that does not apply Cast.

full stacktrace

23/06/25 07:30:58 WARN UnsafeProjection: Expr codegen error and falling back to interpreter mode
java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId$lzycompute(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.castToDateCode(Cast.scala:1655)
    at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeCastFunction(Cast.scala:1335)
    at org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:1316)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Cast.genCode(Cast.scala:1310)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.$anonfun$doGenCode$5(hash.scala:304)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.doGenCode(hash.scala:303)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Pmod.doGenCode(arithmetic.scala:1068)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:327)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:124)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:120)
    at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:161)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.getPartitionKeyExtractor$1(ShuffleExchangeExec.scala:316)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13(ShuffleExchangeExec.scala:384)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13$adapted(ShuffleExchangeExec.scala:383)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 WARN UnsafeProjection: Expr codegen error and falling back to interpreter mode
java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId$lzycompute(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.castToDateCode(Cast.scala:1655)
    at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeCastFunction(Cast.scala:1335)
    at org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:1316)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Cast.genCode(Cast.scala:1310)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.$anonfun$doGenCode$5(hash.scala:304)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.doGenCode(hash.scala:303)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Pmod.doGenCode(arithmetic.scala:1068)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:327)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:124)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:120)
    at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:161)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.getPartitionKeyExtractor$1(ShuffleExchangeExec.scala:316)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13(ShuffleExchangeExec.scala:384)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13$adapted(ShuffleExchangeExec.scala:383)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 WARN UnsafeProjection: Expr codegen error and falling back to interpreter mode
java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId$lzycompute(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.castToDateCode(Cast.scala:1655)
    at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeCastFunction(Cast.scala:1335)
    at org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:1316)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Cast.genCode(Cast.scala:1310)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.$anonfun$doGenCode$5(hash.scala:304)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.doGenCode(hash.scala:303)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Pmod.doGenCode(arithmetic.scala:1068)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:327)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:124)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:120)
    at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:161)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.getPartitionKeyExtractor$1(ShuffleExchangeExec.scala:316)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13(ShuffleExchangeExec.scala:384)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13$adapted(ShuffleExchangeExec.scala:383)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 WARN UnsafeProjection: Expr codegen error and falling back to interpreter mode
java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:63)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId$lzycompute(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.zoneId(Cast.scala:491)
    at org.apache.spark.sql.catalyst.expressions.Cast.castToDateCode(Cast.scala:1655)
    at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeCastFunction(Cast.scala:1335)
    at org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:1316)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Cast.genCode(Cast.scala:1310)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.$anonfun$prepareArguments$3(objects.scala:124)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments(objects.scala:123)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.prepareArguments$(objects.scala:91)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.prepareArguments(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.doGenCode(objects.scala:414)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.$anonfun$doGenCode$5(hash.scala:304)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.doGenCode(hash.scala:303)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.Pmod.doGenCode(arithmetic.scala:1068)
    at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:200)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:195)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:327)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:124)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:120)
    at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:161)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.getPartitionKeyExtractor$1(ShuffleExchangeExec.scala:316)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13(ShuffleExchangeExec.scala:384)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$13$adapted(ShuffleExchangeExec.scala:383)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/32/temp_shuffle_1e795d97-2ebe-429c-bfa2-426c4ab78d48
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/16/temp_shuffle_22a088c0-95d4-4d2c-90b0-14e9030126e3
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/37/temp_shuffle_ec7d3319-c4d4-4c50-a8fa-158d4b5dd1a5
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/2c/temp_shuffle_c5abb6d0-4a70-4578-8684-c04f4ddad31d
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/3d/temp_shuffle_c8da7e83-3496-47f1-950e-8306c35b3f23
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/2e/temp_shuffle_f224b45c-a0a7-4cda-b5f6-c85e238a9bd7
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/2d/temp_shuffle_c7aec9a1-9734-43d1-b568-096c67e4febc
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/3c/temp_shuffle_4395404a-b316-4a46-99fd-77639703aa8f
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/22/temp_shuffle_d7951822-1f3c-4726-920c-0e7dfdc9601b
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1d/temp_shuffle_199cb904-4f93-4c94-bed3-11373340cc70
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/06/temp_shuffle_ed8b2ebb-9903-46d5-8590-da966d30556b
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/02/temp_shuffle_74a39e8f-bd76-44be-8364-4c2a9afe263e
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/28/temp_shuffle_f7978583-c178-4bc1-a0a2-88c7bed1f7db
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/3c/temp_shuffle_2c85ecbd-d0ee-4e52-986b-76279ab74a80
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/07/temp_shuffle_cbb94f5d-3050-4753-a91b-3b36e100aa01
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/18/temp_shuffle_e62af2cb-67a9-42dc-9174-becd34f2527f
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/3c/temp_shuffle_367506f5-cded-46ea-a6c1-17f5c4b1ab15
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/06/temp_shuffle_84e7ddd9-78a2-4ee5-8191-390a9025bd34
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/21/temp_shuffle_8e9271bc-9fd4-4d02-b809-104b8fc59bbc
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/2c/temp_shuffle_193472e2-fd7f-4360-b355-2f968839e3b4
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/3c/temp_shuffle_fe31b75a-ba4b-4617-81be-9d5f8a614ced
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/0b/temp_shuffle_fbd63855-5d01-422e-a34b-0d2d9a6b57d8
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1f/temp_shuffle_2f6d40a1-0428-4690-9344-cb0b1aecf2ff
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/11/temp_shuffle_b6da32cf-eaac-4f5f-aa83-ebdaafd414f5
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1c/temp_shuffle_8a7a7a4e-b0d2-4525-8021-4c588f30efb0
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/12/temp_shuffle_a0c8b223-0723-4273-aa54-4c32fa8d3d1d
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/39/temp_shuffle_71d7c596-a68c-4611-ba08-24acb9fec879
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/24/temp_shuffle_134f5a51-a77e-4de2-9512-d3135f977de3
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1c/temp_shuffle_60e2108b-cafc-4145-813d-d6801280d78d
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/12/temp_shuffle_12f1aa64-289c-4227-8e37-096d0fe405d8
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/03/temp_shuffle_1746f582-efa1-4fb4-a99a-9f05232c0312
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/29/temp_shuffle_0b945310-2721-480f-bd07-ecb4e3efb2cc
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/31/temp_shuffle_110cf216-e938-4b51-8874-4a28295a5b0e
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/20/temp_shuffle_6b238553-ab5b-4b64-b2a8-0e853613fd83
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/3c/temp_shuffle_fdae9d72-0a5b-4b88-829b-304793c18680
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/2f/temp_shuffle_f09e7419-640b-4375-989f-6cf740317abd
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1b/temp_shuffle_b5767a94-d482-43c5-8b46-0cb5f3e9133b
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/33/temp_shuffle_f395ab7c-fdf5-4af6-a71d-3726493f5ba3
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/2d/temp_shuffle_40c4d985-2184-49cc-b81e-e5bf35a2220f
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/18/temp_shuffle_3b5cf666-9ee7-4d11-9791-586355f068e7
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/06/temp_shuffle_3207f841-556e-4d71-b5c9-6712e746cb5b
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/34/temp_shuffle_621c0fbf-6b41-4c6d-ad81-2813f18aec01
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/22/temp_shuffle_37445110-8c56-427c-8206-80b18516756c
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/17/temp_shuffle_849408cf-4467-4b01-9d30-9d15e3530dc2
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/0c/temp_shuffle_16cc14d0-c39b-4dd3-b569-9c14f6bf4b9a
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1d/temp_shuffle_a57aab24-92f8-4243-8cbb-b0f06a1b1d52
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/00/temp_shuffle_9f5e0e19-cbb8-41ae-bc7a-54dbb85ea3ed
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/07/temp_shuffle_9d90b37f-acda-42ad-8849-908ba2062dd1
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1f/temp_shuffle_fa20755e-b18e-4d3c-a903-55533b85dee8
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/37/temp_shuffle_438b6282-20fb-4127-b4e0-876d6f45485a
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/10/temp_shuffle_b39159dd-1c07-4913-836d-6f54a8bcf7a6
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/33/temp_shuffle_0083d669-12b3-4bd5-8f97-3564203cdffa
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/24/temp_shuffle_f7a02dc2-b3f4-477a-a1bb-cbe7ae967b4c
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/1f/temp_shuffle_24ee0b0f-bb91-4e6f-b886-ab4b1b3e99ba
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/37/temp_shuffle_da37ac0b-1499-4d55-9c33-8265a5d404c0
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/0c/temp_shuffle_37a4bba7-0f83-4ea6-8406-db7bdcb91a8d
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/03/temp_shuffle_fa50b8a3-9a8a-4a06-be62-1e805f47d236
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/32/temp_shuffle_168d6af8-bce7-41bf-9206-b0bca1fd19ba
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/18/temp_shuffle_4ad63201-093b-42c9-8e9d-e978d45c166b
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/2e/temp_shuffle_9c6207eb-856b-45e0-bdb9-a2887b5a6920
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/05/temp_shuffle_4580f61d-0e84-4804-8b39-3e69be8b436f
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/17/temp_shuffle_22c9f4c3-a6b2-452e-9a62-38ebba671e03
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/20/temp_shuffle_652e9263-8d4b-4db9-a9f4-7b24e968d0b3
23/06/25 07:30:58 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-33b99a5f-6160-44ae-957b-f75b0f50e4e2/0d/temp_shuffle_c7ee9341-5b89-4f5a-9d56-095fd59c42ff
23/06/25 07:30:58 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2)
org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
23/06/25 07:30:58 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3) (kyuubi executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

23/06/25 07:30:58 ERROR TaskSetManager: Task 3 in stage 0.0 failed 1 times; aborting job
Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3) (kyuubi executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3) (kyuubi executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [INTERNAL_ERROR] A method named "invoke" is not declared in any enclosing class nor any supertype
    at org.apache.spark.SparkException$.internalError(SparkException.scala:77)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:81)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.methodNotDeclaredError(QueryExecutionErrors.scala:452)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod(objects.scala:173)
    at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike.findMethod$(objects.scala:170)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.findMethod(objects.scala:363)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method$lzycompute(objects.scala:391)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.method(objects.scala:389)
    at org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:401)
    at org.apache.spark.sql.catalyst.expressions.HashExpression.eval(hash.scala:292)
    at org.apache.spark.sql.catalyst.expressions.Pmod.eval(arithmetic.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$6$adapted(ShuffleExchangeExec.scala:317)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$14(ShuffleExchangeExec.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)


>
Yxang commented 1 year ago

BTW, Spark 3.4.1 was just released these days, do you mind sending another PR to upgrade it? Once you get a PR merged, your following PR will not be blocked by "workflows awaiting approval" to run CI.

Please check #254

pan3793 commented 1 year ago

With roughly thought, the timezone resolution would be fixed by

diff --git forkSrcPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala forkDstPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
index 9b1155ef6987e76bcb07add3958e5c124ff7e716..202ba4abab22f2465f2d2ca59ff38c146f244d3d 100644
--- forkSrcPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
+++ forkDstPrefix/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
@@ -17,11 +17,11 @@

 package org.apache.spark.sql.execution.datasources.v2

-import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, TypeCoercion}
+import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, ResolveTimeZone, TypeCoercion}
 import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, SortOrder, TransformExpression, V2ExpressionUtils}
 import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, RepartitionByExpression, Sort}
-import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.connector.catalog.FunctionCatalog
 import org.apache.spark.sql.connector.catalog.functions.ScalarFunction
 import org.apache.spark.sql.connector.distributions._
@@ -83,13 +83,17 @@ object DistributionAndOrderingUtils {
         queryWithDistribution
       }

-      // Apply typeCoercionRules since the converted expression from TransformExpression
-      // implemented ImplicitCastInputTypes
-      typeCoercionRules.foldLeft(queryWithDistributionAndOrdering)((plan, rule) => rule(plan))
+      ResolveTimezoneAndCastExecutor.execute(queryWithDistributionAndOrdering)
     case _ =>
       query
   }

+  private object ResolveTimezoneAndCastExecutor extends RuleExecutor[LogicalPlan] {
+    override val batches =
+      Batch("Resolve TypeCoercion", Once, typeCoercionRules: _*) ::
+      Batch("Resolve TimeZone", Once, ResolveTimeZone) :: Nil
+  }
+
   private def resolveTransformExpression(expr: Expression): Expression = expr.transform {
     case TransformExpression(scalarFunc: ScalarFunction[_], arguments, Some(numBuckets)) =>
       V2ExpressionUtils.resolveScalarFunction(scalarFunc, Seq(Literal(numBuckets)) ++ arguments)
pan3793 commented 1 year ago

@Yxang SPARK-44180(https://github.com/apache/spark/pull/41725) is opened to address the cast issue(codegen enabled), before it gets fixed, you can construct non-timezone aware cases to test the implicit cast feature.

Yxang commented 1 year ago

Thanks for the fast fix! I will investigate other casting cases to check if this work. Do we now wait for the Spark patch to be merged, or we create another patch later then?

pan3793 commented 1 year ago

The Spark has a quite long release schedule, SPARK-44180 only fixes the timezone-aware cast case, it does not block us to do other work.

pan3793 commented 1 year ago

Sorry, I'm busy with other things these days, I did find time snippets to play with the code and find some room to improve, it may take several days to feedback in detail.

Yxang commented 1 year ago

Thanks, and also sorry for busy with other work. About the implicit type conversation problem, though not fully understand it, I have some upgrades:

  1. Seems the "invoke function not found" error in interpret mode is (at least partly) because implicit type conversation is not done by some reason, as the error is gone if we implement the invoke function for the original not-casted type.
  2. On connector side, other than during sort (introduced by RequiredDistributionAndOrdering), implicit type conversation issue could also happens when we use shardProjection to determine which shard a row belongs to. Here we generate a projection based on the catalyst expression from clickhouse sharding expression, however type conversion is missing here. I am working on a fix now.

Also, I used positiveModulo when dealing with shard problem, but it does not exist in early version of clickhouse. This will not influence shard handling as it happens all in spark, but the udf itself is problematic. I am also working on this yet.

Seems this PR needs some more polishing, and I am making this draft now. Is making draft interfere with reviewing? Canceled it

Yxang commented 1 year ago

Pushed commits fixing the above shardProjection issue and pmod issue

pan3793 commented 1 year ago
  1. Seems the "invoke function not found" error in interpret mode is (at least partly) because implicit type conversation is not done by some reason, as the error is gone if we implement the invoke function for the original not-casted type.

That's right. By default, Spark prefers to use codegen mode to execute expressions, and fallback to interpret mode if failed. SPARK-44180 fixed the codegen mode and I didn't take a look at what interpret mode happened, but I think it should have another issue. I will find another time to take a look at what happened, TBH, interpret mode is rarely used in production.

One advantage of using ByteBuf though is if in the future we want to support beyond utf8 string, we can handle type conversion between raw byte arrays and target type by e.g. ByteBuf.writeDoubleLE.

writeDoubleLE could be simply implemented by adding a util method, I would prefer byte[] instead, because constructing ByteBuf instance from bytes on each record is not cheap.

I think we can split this PR non-Spark part and merge it first, specifically:

  1. move CityHash_v1_0_2 and UInt128 to clickhouse-core module, maybe under package xenon.clickhouse.hash
  2. move spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Util.scala to clickhouse-core module, maybe call HashUtils under package xenon.clickhouse.hash too.
  3. refer to https://github.com/housepower/spark-clickhouse-connector/pull/260 and write some tests to make sure that the hash function gets the same result as clickhouse.
Yxang commented 1 year ago

Oops, I messed up with diffs, pushed to fix that.

Opened a PR #261 for the hash part in the last comment. I will adjust the code in this PR once that is merged. Please kindly review that.

pan3793 commented 1 year ago

@Yxang No worries. Thanks for your excellent work.

Yxang commented 1 year ago

@pan3793 Adjusted code as #261 merged. However, I am not sure the code is simplified as you would imagine. We still need to handle binding and Clickhouse function name mapping. Please kindly review it and check if anything needs improvement!