kwai / blaze

Blazing-fast query execution engine speaks Apache Spark language and has Arrow-DataFusion at its core.
Apache License 2.0
1.3k stars 121 forks source link

spark scala udf fuction ,it always fails with such error:External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF #648

Open frencopei opened 1 week ago

frencopei commented 1 week ago

Describe the bug spark verion: 3.2.2 scala version: Scala version 2.12.15 blaze version: 4.0

yarn executor log: 2024-11-14 18:54:08,243 [ERROR] [task-result-getter-0] Task 0 in stage 0.0 failed 4 times; aborting job (org.apache.spark.scheduler.TaskSetManager(org.apache.spark.internal.Logging.logError:73)) 2024-11-14 18:54:08,261 [ERROR] [Driver] User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 76) (10.16.7.17 executor 1): java.lang.RuntimeException: [partition=0] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[Project]: output() returns error: External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

Driver stacktrace: (org.apache.spark.deploy.yarn.ApplicationMaster(org.apache.spark.internal.Logging.logError:94)) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 76) (10.16.7.17 executor 1): java.lang.RuntimeException: [partition=0] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[Project]: output() returns error: External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: java.lang.RuntimeException: [partition=0] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[Project]: output() returns error: External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

To Reproduce 1 coding: import com..task.CommonMain import com..task.bi.commom.CommonFilterCondition.BOOTSTRAP_FILTERCONDITION import com..task.bi.commom.CommonHiveUtils.{getHiveDataByMonth, getHiveDataByWeek} import com..task.panoramicview.ClickHouseUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions. import org.apache.spark.sql.functions._

object MyExample extends CommonMain {

val RESULT_TABLE = "ads_nps_stability_audio_exception_mi_cluster" val AUDIO_TABLE= "terminal_tv.ods_nps_stability_audio_dfx_exception_di"

override def execute(spark: SparkSession, calcDate: String): Unit = { import spark.implicits._

val firstDay =calcDate.substring(0,7)+"-01"

// read hive table val result = getHiveDataByMonth(spark,AUDIO_TABLE,calcDate) .filter(BOOTSTRAP_FILTER_CONDITION) .filter(" deviceType is not null and deviceType not in ('','000') ") .select("dnum","deviceType","tclOsVersion","exceptionName","halAlgResult","underrunCnt","repairAction", "repairSuccess","trackPkgName") .withColumn("underloadLevel",valueRangeUDF(col("underrunCnt"))) .withColumn("trackPkgName",expr(" case when trackPkgName is not null and trackPkgName not in ( '','null') " + " then trackPkgName else 'unknown' end ")) .withColumn("chip", split($"deviceType", "-")(2)) .select("dnum", "chip","deviceType", "tclOsVersion", "exceptionName", "halAlgResult", "underloadLevel", "repairAction", "repairSuccess", "trackPkgName") .groupBy("chip","deviceType", "tclOsVersion", "exceptionName", "halAlgResult", "underloadLevel", "repairAction", "repairSuccess", "trackPkgName") .agg(countDistinct("dnum").alias("machineCount"), count("dnum").alias("timeCount")) .withColumn("recordDate",lit(firstDay)) .select("chip","deviceType", "tclOsVersion", "exceptionName", "halAlgResult", "underloadLevel", "repairAction", "repairSuccess", "trackPkgName","machineCount","timeCount","recordDate")

result.show()
ClickHouseUtils.saveToDb(result, RESULT_TABLE, firstDay)

}

def valueRangeUDF:UserDefinedFunction=udf{(valueStr: String) =>

val value = valueStr.toDouble
// return range discription by value
value match {
  case v if v == 0 => "0"
  case v if v > 0 && v <= 100 => "(0-100]"
  case v if v > 100 && v <= 200 => "(100-200]"
  case v if v > 200 && v <= 300 => "(200-300]"
  case v if v > 300 && v <= 400 => "(300-400]"
  case v if v > 400 && v <= 500 => "(400-500]"
  case v if v > 500 && v <= 600 => "(500-600]"
  case v if v > 600 && v <= 700 => "(600-700]"
  case v if v > 700 && v <= 800 => "(700-800]"
  case v if v > 800 && v <= 900 => "(800-900]"
  case v if v > 900 && v <= 1000 => "(900-1000]"
  case v if v > 10000 && v <= 15000 => "(10000-15000]"
  case v if v > 15000 && v <= 20000 => "(15000-20000]"
  case v if v > 20000 && v <= 25000 => "(20000-25000]"
  case v if v > 25000 && v <= 30000 => "(25000-30000]"
  case v if v > 30000 => "大于 30000"
  case _ => "Invalid Range" // 
}

} }

2 aflter maven build jar, spark-submit run it , always failed it seem blaze cannot convert UDF class .
/usr/local/bin/spark-submit --class com.tcl.task.MyExample --master yarn --jars .... --executor-memory 12G --executor-cores 6 --driver-memory 6g --conf spark.sql.broadcastTimeout=600 --deploy-mode cluster --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=4 --conf spark.dynamicAllocation.maxExecutors=8 --conf spark.yarn.executor.memoryOverhead=1024 cosn://dc-bucket/myapp/MyScalaExample.jar 2024-10-12 yarn

exception: image

wfxxh commented 3 days ago

I have got the same error. QQ_1731993126356

My udf is : QQ_1731993186655