apache / kyuubi

Apache Kyuubi is a distributed and multi-tenant gateway to provide serverless SQL on data warehouses and lakehouses.
https://kyuubi.apache.org/
Apache License 2.0
2.03k stars 892 forks source link

[Bug] Excuting spark core spark.sql("add jar path/xxx") will occur `ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoade` #2926

Open FishManHome opened 2 years ago

FishManHome commented 2 years ago

Code of Conduct

Search before asking

Describe the bug

Based on #2471 #2475 Excuting spark core spark.sql("add jar path/xxx") will occur ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoade

My Spark code api

spark.sql("add jar file:/xxxxx.jar") import org.apache.spark.sql.{SaveMode, SparkSession}
import com.datastax.spark.connector.cql.CassandraConnectorConf import org.apache.spark.sql.cassandra._ spark.setCassandraConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000)) val writeDf = spark.read.parquet("xxxx") writeDf.printSchema() val cassandraMap = Map(table -> "xxxx", keyspace -> " xxxx") writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()

Affects Version(s)

master

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()
 org.apache.spark.SparkException: Writing job aborted.
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
  at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
  at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
  at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
  at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
  at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
  at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
  ... 57 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoader@1c852c0f
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
  ... 85 more
        at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
        at org.apache.kyuubi.engine.spark.operation.ExecuteScala.$anonfun$runInternal$1(ExecuteScala.scala:84)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:88)
        at org.apache.kyuubi.engine.spark.operation.ExecuteScala.runInternal(ExecuteScala.scala:60)
        at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:133)
        at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:94)
        at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:66)
        at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:124)
        at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:75)
        at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:121)
        at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:66)
        at org.apache.kyuubi.service.TFrontendService.ExecuteStatement(TFrontendService.scala:232)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
org.apache.kyuubi.KyuubiSQLException: Interpret error:
writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()
 org.apache.spark.SparkException: Writing job aborted.
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
  at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
  at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
  at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
  at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
  at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
  at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
  ... 57 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoader@1c852c0f
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
  ... 85 more
        at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
        at org.apache.kyuubi.engine.spark.operation.ExecuteScala.$anonfun$runInternal$1(ExecuteScala.scala:84)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:88)
        at org.apache.kyuubi.engine.spark.operation.ExecuteScala.runInternal(ExecuteScala.scala:60)

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

yaooqinn commented 2 years ago

cc @iodone, can you take a look at this?

iodone commented 2 years ago

Yes, I'm following up on this.

iodone commented 2 years ago

I tried to reproduce the problem with the following example:

With Beeline:

SET kyuubi.operation.language=scala;
spark.sql("add jar file:/Users/work/tmp/jar-test/target/spark_to_cassandra_jars-1.0-SNAPSHOT.jar");
import org.apache.spark.sql.{SaveMode, SparkSession};
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import org.apache.spark.sql.cassandra._;
spark.setCassandraConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000));
val writeDf = spark.read.parquet("/Users/work/tmp/test.parquet");
val x = writeDf.limit(10);
result.set(x);
val cassandraMap = Map("table" -> "test", "keyspace" -> "store");
val df = spark.read.format("org.apache.spark.sql.cassandra").options(cassandraMap).load;
val y = df.limit(10);
result.set(y);
writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()

Tested cassandra read and write, everything works well.

image

yaooqinn commented 2 years ago

cc @FishManHome