apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.93k stars 1.79k forks source link

[Bug] [Spark] Seatunnel on Spark occurred conversion exception #7011

Closed zhang5059T closed 3 months ago

zhang5059T commented 4 months ago

Search before asking

What happened

环境说明

使用 spark-operator 来跑Seatunnel项目。

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  parallelism = 8
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@x.x.x.x:1521:orcl"
        driver = "oracle.jdbc.OracleDriver"
        user = "seatunnel"
        password = "seatunnel123456"
        query = "select * from seatunnel.FA_DATA_23_15"
    }
}
sink {
    Jdbc {
        url = "jdbc:oracle:thin:@x.x.x.x:1521:orcl"
        driver = "oracle.jdbc.OracleDriver"
        user = "seatunnel01"
        password = "seatunnel123456"
                generate_sink_sql = true
        database = ORCL
        table = "FA_DATA_23_15"
    }
}

Running Command

SparkApplication 资源

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-seatunnel
  namespace: spark-operator
spec:
  type: Scala
  mode: cluster
  image: "datawork/spark3.3.0-seatunnal2.3.5:v2-r1"
  mainClass: org.apache.seatunnel.core.starter.spark.SeaTunnelSpark
  arguments: ["--config", "/data/v2.oracle.conf"]
  mainApplicationFile: "http://x.x.x.x:32175/spark/seatunnel/seatunnel-spark-3-starter-1.jar"
  imagePullPolicy: Always
  sparkVersion: "3.3.0"
  restartPolicy:
    type: Never
  volumes:
    - name: seatunnel-oracle
      configMap:
        name: seatunnel-oracle
        items:
          - key: v2.oracle.conf
            path: v2.oracle.conf
  deps:
          jars: ["local:///opt/seatunnel/lib/ojdbc8-23.4.0.24.05.jar","local:///opt/seatunnel/lib/seatunnel-transforms-v2.jar"]
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.3.0
    serviceAccount: spark-release-spark-operator
    env:
      - name: SEATUNNEL_HOME
        value: "/opt/seatunnel"
    volumeMounts:
      - name: seatunnel-oracle
        mountPath: /data/v2.oracle.conf
        subPath: v2.oracle.conf
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    env:
      - name: SEATUNNEL_HOME
        value: "/opt/seatunnel"
    labels:
      version: 3.3.0
    volumeMounts:
      - name: seatunnel-oracle
        mountPath: /data/v2.oracle.conf
        subPath: v2.oracle.conf

Error Exception

### Exception
24/06/18 07:29:58 INFO AbstractJdbcCatalog: Catalog Oracle closing
24/06/18 07:29:58 INFO V2ScanRelationPushDown: 

24/06/18 07:29:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/06/18 07:29:59 INFO AppendDataExec: Start processing data source write support: org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite@64514009. The input RDD has 8 partitions.
24/06/18 07:29:59 INFO SparkContext: Starting job: save at SinkExecuteProcessor.java:162
24/06/18 07:29:59 INFO DAGScheduler: Got job 0 (save at SinkExecuteProcessor.java:162) with 8 output partitions
24/06/18 07:29:59 INFO DAGScheduler: Final stage: ResultStage 0 (save at SinkExecuteProcessor.java:162)
24/06/18 07:29:59 INFO DAGScheduler: Parents of final stage: List()
24/06/18 07:29:59 INFO DAGScheduler: Missing parents: List()
24/06/18 07:29:59 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at save at SinkExecuteProcessor.java:162), which has no missing parents
24/06/18 07:29:59 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 102.2 KiB, free 116.9 MiB)
24/06/18 07:29:59 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.5 KiB, free 116.8 MiB)
24/06/18 07:29:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on spark-seatunnel-792859902a4065c7-driver-svc.spark-operator.svc:7079 (size: 25.5 KiB, free: 116.9 MiB)
24/06/18 07:29:59 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1513
24/06/18 07:29:59 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at save at SinkExecuteProcessor.java:162) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7))
24/06/18 07:29:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 8 tasks resource profile 0
24/06/18 07:29:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (10.16.59.102, executor 1, partition 0, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.16.59.102:45881 (size: 25.5 KiB, free: 116.9 MiB)
24/06/18 07:29:59 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (10.16.59.102, executor 1, partition 1, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.16.59.102 executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

24/06/18 07:29:59 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 2) (10.16.59.102, executor 1, partition 0, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on 10.16.59.102, executor 1: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 1]
24/06/18 07:29:59 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 3) (10.16.59.102, executor 1, partition 1, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 2) on 10.16.59.102, executor 1: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 2]
24/06/18 07:29:59 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 4) (10.16.59.102, executor 1, partition 0, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 3) on 10.16.59.102, executor 1: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 3]
24/06/18 07:29:59 INFO TaskSetManager: Starting task 1.2 in stage 0.0 (TID 5) (10.16.59.102, executor 1, partition 1, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 4) on 10.16.59.102, executor 1: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 4]
24/06/18 07:29:59 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 6) (10.16.59.102, executor 1, partition 0, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 INFO TaskSetManager: Lost task 1.2 in stage 0.0 (TID 5) on 10.16.59.102, executor 1: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 5]
24/06/18 07:29:59 INFO TaskSetManager: Starting task 1.3 in stage 0.0 (TID 7) (10.16.59.102, executor 1, partition 1, PROCESS_LOCAL, 4604 bytes) taskResourceAssignments Map()
24/06/18 07:29:59 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on 10.16.59.102, executor 1: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 6]
24/06/18 07:29:59 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
24/06/18 07:29:59 INFO TaskSchedulerImpl: Cancelling stage 0
24/06/18 07:29:59 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
24/06/18 07:29:59 INFO TaskSchedulerImpl: Stage 0 was cancelled
24/06/18 07:29:59 INFO DAGScheduler: ResultStage 0 (save at SinkExecuteProcessor.java:162) failed in 0.784 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (10.16.59.102 executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
        at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
24/06/18 07:29:59 INFO DAGScheduler: Job 0 failed: save at SinkExecuteProcessor.java:162, took 0.812955 s
24/06/18 07:29:59 INFO TaskSetManager: Lost task 1.3 in stage 0.0 (TID 7) on 10.16.59.102, executor 1: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 7]
24/06/18 07:29:59 ERROR AppendDataExec: Data source write support org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite@64514009 is aborting.
24/06/18 07:29:59 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
24/06/18 07:29:59 ERROR AppendDataExec: Data source write support org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite@64514009 aborted.
24/06/18 07:29:59 ERROR SeaTunnel: 

===============================================================================

24/06/18 07:29:59 ERROR SeaTunnel: Fatal Error, 

24/06/18 07:29:59 ERROR SeaTunnel: Please submit bug report in https://github.com/apache/seatunnel/issues

24/06/18 07:29:59 ERROR SeaTunnel: Reason:Run SeaTunnel on spark failed 

24/06/18 07:29:59 ERROR SeaTunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Writing job aborted
        at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:749)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:244)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:162)
        at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:71)
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
        ... 14 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (10.16.59.102 executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
        at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:377)
        ... 49 more
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
        at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

24/06/18 07:29:59 ERROR SeaTunnel: 
===============================================================================

24/06/18 07:29:59 INFO SparkUI: Stopped Spark web UI at http://spark-seatunnel-792859902a4065c7-driver-svc.spark-operator.svc:4040
24/06/18 07:29:59 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
24/06/18 07:29:59 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
24/06/18 07:29:59 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
24/06/18 07:30:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/06/18 07:30:00 INFO MemoryStore: MemoryStore cleared
24/06/18 07:30:00 INFO BlockManager: BlockManager stopped
24/06/18 07:30:00 INFO BlockManagerMaster: BlockManagerMaster stopped
24/06/18 07:30:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/06/18 07:30:00 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Writing job aborted
        at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:749)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:244)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:162)
        at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:71)
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
        ... 14 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (10.16.59.102 executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
        at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:377)
        ... 49 more
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
        at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
24/06/18 07:30:00 INFO ShutdownHookManager: Shutdown hook called
24/06/18 07:30:00 INFO ShutdownHookManager: Deleting directory /var/data/spark-9b97047f-2828-4040-ba32-006362a8a7a8/spark-62fd0da7-d77a-4a84-8bde-1a33ee88a05b
24/06/18 07:30:00 INFO ShutdownHookManager: Deleting directory /tmp/spark-0c62c6bf-2576-4523-9d65-c3d8645cb94d

Zeta or Flink or Spark Version

spark

3.3.0

seatunnel

2.3.5

Java or Scala Version

spark -scala

Scala 2.12.15

java

openjdk 11.0.13

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

zhang5059T commented 4 months ago

I just set up a Standalone cluster. The Spark version is 3.3.0. I found that Seatunnel can run normally, but when I submit tasks through the spark-operator method, this error occurs. Now, I'm not quite sure whether it is caused by the version problem of Scala.

Carl-Zhou-CN commented 4 months ago

It seems to be an issue with the Spark Operator itself. Could you try using this image and configuring a non-seatunnel task to see if the problem persists?

zhang5059T commented 4 months ago

Hi @Carl-Zhou-CN Thank you for your answer.

I am able to run the official demo program without any issues. I noticed that there is a Scala package within the seatunnel-spark-3-starter.jar file, and I suspect that it might be causing the problem. After looking at the code, I found that only scala.Tuple2 is used in Seatunnel. Is there an alternative solution to replace it?

I saw that the versions of scala.version and scala.binary.version in the packaging pom of Spark 3.3.0 are the same as those of Seatunnel.

Carl-Zhou-CN commented 4 months ago

image Hi, @zhang5059T It seems to be more than that

zhang5059T commented 4 months ago

Oh, exactly. I overlooked a lot just now.

Now, apart from this direction, it seems that I have no other ideas to handle this problem.Could you give me some suggestions?

Carl-Zhou-CN commented 4 months ago

Oh, exactly. I overlooked a lot just now.

Now, apart from this direction, it seems that I have no other ideas to handle this problem.Could you give me some suggestions?

What happens if you manually delete the scala package in seatunnel-spark-3-starter.jar

zhang5059T commented 4 months ago

I directly delete the Scala package in seatunnel-spark-3-starter.jar ,It still reports an error the same way. That error. t2

Carl-Zhou-CN commented 4 months ago

@CheneyYin Can you please help take a look at this?

zhang5059T commented 4 months ago

This is the submitted command output in the log.

/opt/spark/bin/spark-submit 
--class org.apache.seatunnel.core.starter.spark.SeaTunnelSpark 
--master k8s://https://192.18.0.1:443 
--deploy-mode cluster 
--conf spark.kubernetes.namespace=spark-operator 
--conf spark.app.name=spark-seatunnel 
--conf spark.kubernetes.driver.pod.name=spark-seatunnel-driver 
--jars http://x.x.x.x:9000/spark/seatunnel/ojdbc8-23.4.0.24.05.jar,http://x.x.x.x:9000/spark/seatunnel/seatunnel-transforms-v2.jar,http://x.x.x.x:9000/spark/seatunnel/seatunnel-hadoop3-3.1.4-uber.jar 
--conf spark.kubernetes.container.image=172.18.172.12/datawork/spark3.3.0-seatunnal2.3.5:v2-r2 
--conf spark.kubernetes.container.image.pullPolicy=Always 
--conf spark.kubernetes.submission.waitAppCompletion=false 
--conf spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=spark-seatunnel 
--conf spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true 
--conf spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=f5a15d17-ca8f-48cf-8fa1-4052dc3e81f8 
--conf spark.driver.cores=1 
--conf spark.kubernetes.driver.limit.cores=1200m 
--conf spark.driver.memory=512m 
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-release-spark-operator 
--conf spark.kubernetes.driver.label.version=3.3.0 
--conf spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=spark-seatunnel 
--conf spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true 
--conf spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=f5a15d17-ca8f-48cf-8fa1-4052dc3e81f8 
--conf spark.executor.instances=1 --conf spark.executor.cores=1 
--conf spark.executor.memory=512m 
--conf spark.kubernetes.executor.label.version=3.3.0 
http://x.x.x.x:9000/spark/seatunnel/seatunnel-spark-3-starter.jar 
--config /data/v2.oracle.conf  
zhang5059T commented 4 months ago

Hi @Carl-Zhou-CN I have a doubt. In the case given in the official documentation, the --deploy-mode is in client mode. So how should I submit it in cluster mode? I tried to directly change it to cluster, but it throws an exception saying that the config file cannot be found. I copied the config file to all the worker nodes, and ran it using the absolute path, but it still throws the exception of not finding the config file. I also put the config file in MinIO, but it didn't work either.

CMD

./bin/start-seatunnel-spark-3-connector-v2.sh  
--master spark://master.cluster.local:7077 
--deploy-mode cluster 
-–executor-memory 4g  
--config /home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf 
./bin/start-seatunnel-spark-3-connector-v2.sh  
--master spark://master.cluster.local:7077 
--deploy-mode cluster 
-–executor-memory 4g  
--config http://x.x.x.x:9000/minio/spark/seatunnel/v2.oracle2.conf
./bin/start-seatunnel-spark-3-connector-v2.sh  
--master spark://master.cluster.local:7077 
--deploy-mode cluster -–executor-memory 4g  
--files /home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf  
--config /home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf

Spark commit cmd

${SPARK_HOME}/bin/spark-submit 
--class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" 
--name "SeaTunnel" --master "spark://master.cluster.local:7077" 
--deploy-mode "cluster" 
--jars "/home/zcp/seatunnelworkspace/seatunnel/lib/seatunnel-transforms-v2.jar,/home/zcp/seatunnelworkspace/seatunnel/lib/seatunnel-hadoop3-3.1.4-uber.jar,/home/zcp/seatunnelworkspace/seatunnel/lib/ojdbc8-23.4.0.24.05.jar,/home/zcp/seatunnelworkspace/seatunnel/lib/mysql-connector-java-8.0.30.jar,/home/zcp/seatunnelworkspace/seatunnel/connectors/connector-jdbc-2.3.5.jar" 
--files "/home/zcp/seatunnelworkspace/seatunnel/plugins.tar.gz,/home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf" 
--conf "parallelism=8" 
/home/zcp/seatunnelworkspace/seatunnel/starter/seatunnel-spark-3-starter.jar 
--config "/home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf" 
--master "spark://master.cluster.local:7077" 
--deploy-mode "cluster" 
--name "SeaTunnel"

Exception

24/06/19 22:31:32 ERROR SeaTunnel: Fatal Error, 

24/06/19 22:31:32 ERROR SeaTunnel: Please submit bug report in https://github.com/apache/seatunnel/issues

24/06/19 22:31:32 ERROR SeaTunnel: Reason:ErrorCode:[COMMON-22], ErrorDescription:[SeaTunnel read file 'v2.oracle2.conf' failed, because it not existed.] 

24/06/19 22:31:32 ERROR SeaTunnel: Exception StackTrace:org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[COMMON-22], ErrorDescription:[SeaTunnel read file 'v2.oracle2.conf' failed, because it not existed.]
    at org.apache.seatunnel.common.exception.CommonError.fileNotExistFailed(CommonError.java:78)
    at org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist(FileUtils.java:66)
    at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:50)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)

24/06/19 22:31:32 ERROR SeaTunnel: 
===============================================================================

24/06/19 22:31:32 INFO ShutdownHookManager: Shutdown hook called
24/06/19 22:31:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-c5113a48-4ce5-496f-878c-0f6e8f069f65

Could you please give a case of execution in cluster mode?

Carl-Zhou-CN commented 4 months ago

image This is what I tried on yarn

Carl-Zhou-CN commented 4 months ago

./bin/start-seatunnel-spark-3-connector-v2.sh --master yarn --deploy-mode cluster -–executor-memory 4g --config config/v2.batch.config.template

Carl-Zhou-CN commented 4 months ago

Hi @Carl-Zhou-CN I have a doubt. In the case given in the official documentation, the --deploy-mode is in client mode. So how should I submit it in cluster mode? I tried to directly change it to cluster, but it throws an exception saying that the config file cannot be found. I copied the config file to all the worker nodes, and ran it using the absolute path, but it still throws the exception of not finding the config file. I also put the config file in MinIO, but it didn't work either.

CMD

./bin/start-seatunnel-spark-3-connector-v2.sh  
--master spark://master.cluster.local:7077 
--deploy-mode cluster 
-–executor-memory 4g  
--config /home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf 
./bin/start-seatunnel-spark-3-connector-v2.sh  
--master spark://master.cluster.local:7077 
--deploy-mode cluster 
-–executor-memory 4g  
--config http://x.x.x.x:9000/minio/spark/seatunnel/v2.oracle2.conf
./bin/start-seatunnel-spark-3-connector-v2.sh  
--master spark://master.cluster.local:7077 
--deploy-mode cluster -–executor-memory 4g  
--files /home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf  
--config /home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf

Spark commit cmd

${SPARK_HOME}/bin/spark-submit 
--class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" 
--name "SeaTunnel" --master "spark://master.cluster.local:7077" 
--deploy-mode "cluster" 
--jars "/home/zcp/seatunnelworkspace/seatunnel/lib/seatunnel-transforms-v2.jar,/home/zcp/seatunnelworkspace/seatunnel/lib/seatunnel-hadoop3-3.1.4-uber.jar,/home/zcp/seatunnelworkspace/seatunnel/lib/ojdbc8-23.4.0.24.05.jar,/home/zcp/seatunnelworkspace/seatunnel/lib/mysql-connector-java-8.0.30.jar,/home/zcp/seatunnelworkspace/seatunnel/connectors/connector-jdbc-2.3.5.jar" 
--files "/home/zcp/seatunnelworkspace/seatunnel/plugins.tar.gz,/home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf" 
--conf "parallelism=8" 
/home/zcp/seatunnelworkspace/seatunnel/starter/seatunnel-spark-3-starter.jar 
--config "/home/zcp/seatunnelworkspace/seatunnel/config/v2.oracle2.conf" 
--master "spark://master.cluster.local:7077" 
--deploy-mode "cluster" 
--name "SeaTunnel"

Exception

24/06/19 22:31:32 ERROR SeaTunnel: Fatal Error, 

24/06/19 22:31:32 ERROR SeaTunnel: Please submit bug report in https://github.com/apache/seatunnel/issues

24/06/19 22:31:32 ERROR SeaTunnel: Reason:ErrorCode:[COMMON-22], ErrorDescription:[SeaTunnel read file 'v2.oracle2.conf' failed, because it not existed.] 

24/06/19 22:31:32 ERROR SeaTunnel: Exception StackTrace:org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[COMMON-22], ErrorDescription:[SeaTunnel read file 'v2.oracle2.conf' failed, because it not existed.]
  at org.apache.seatunnel.common.exception.CommonError.fileNotExistFailed(CommonError.java:78)
  at org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist(FileUtils.java:66)
  at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:50)
  at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
  at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
  at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63)
  at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)

24/06/19 22:31:32 ERROR SeaTunnel: 
===============================================================================

24/06/19 22:31:32 INFO ShutdownHookManager: Shutdown hook called
24/06/19 22:31:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-c5113a48-4ce5-496f-878c-0f6e8f069f65

Could you please give a case of execution in cluster mode?

It looks like --files did not take effect, the files were not copied to the appropriate place

zhang5059T commented 4 months ago

Yes, no matter how I set it, this configuration file has not taken effect. I'm not sure if it can not be submitted as the cluster mode.

jiamin13579 commented 2 months ago

Yes, no matter how I set it, this configuration file has not taken effect. I'm not sure if it can not be submitted as the cluster mode.

Hi, did you solve it in the end? I also encountered the same problem