apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.05k stars 1.82k forks source link

[Bug] Spark 3.5 only: submit spark job failed with "java.lang.IncompatibleClassChangeError" error #6514

Open GangLiCN opened 8 months ago

GangLiCN commented 8 months ago

Search before asking

What happened

Spark job failed with "java.lang.IncompatibleClassChangeError" error.

SeaTunnel Version

2.3.4

SeaTunnel Config

env {
  # You can set SeaTunnel environment configuration here
  parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}

sink {
  Console {
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}

Running Command

start-seatunnel-spark-3-connector-v2.cmd --config v2.batch.config.template -m local -e client

Error Exception

Exception in thread "main" java.lang.IncompatibleClassChangeError: Conflicting default methods: org/apache/spark/sql/connector/write/BatchWrite.useCommitCoordinator org/apache/spark/sql/connector/write/streaming/StreamingWrite.useCommitCoordinator
        at org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite.useCommitCoordinator(SeaTunnelBatchWrite.java)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:374)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:225)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:312)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:155)
        at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:71)
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Zeta or Flink or Spark Version

Spark 3.5.1

Java or Scala Version

java version "1.8.0_391"

Screenshots

image

Are you willing to submit PR?

Code of Conduct

GangLiCN commented 8 months ago

This job can successfully run under Spark 3.X except Spark 3.5.

Relative log for successful running under Spark 3.4.X: ...... 24/03/15 11:07:43 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_1) for table fake. Generation time: 1710472063391 24/03/15 11:07:43 INFO FakeSourceReader: Closed the bounded fake source 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=1: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : YAAbZ, 1634054148 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=2: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : eGMQE, 147352027 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=3: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : sJbSP, 1787222905 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=4: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : CalIH, 313452708 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=5: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : jTDea, 70559322 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=6: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : EdnkE, 1421819677 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=7: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : cNsDT, 1018830373 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=8: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : DKRDX, 1659072634 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=9: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : ZnYMk, 838843086 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=10: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : iYgKs, 1177053274 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=11: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : fQoAM, 393998761 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=12: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : ZBkgo, 1779849525 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=13: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : zEARW, 160078189 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=14: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : sBuQx, 1542831853 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=15: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : czRbH, 415969264 24/03/15 11:07:43 INFO ConsoleSinkWriter: subtaskIndex=1 rowIndex=16: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : iPeFj, 830278786 24/03/15 11:07:43 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 1, attempt 0, stage 0.0) 24/03/15 11:07:43 INFO DataWritingSparkTask: Committed partition 1 (task 1, attempt 0, stage 0.0) 24/03/15 11:07:43 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1472 bytes result sent to driver ......

FROM332211 commented 7 months ago

我在集成seatunnel和spark3.5.1的时候也出现了这个问题,seatunnel依赖的版本是spark 3.3.0,spark 3.5.1的这部分api已经发生了一些变化,我是直接下载源码修改pom依赖的spark版本为3.5.1之后修改了一两处代码之后就可以运行通了

WZH-hub commented 1 month ago

我在集成seatunnel和spark3.5.1的时候也出现了这个问题,seatunnel依赖的版本是spark 3.3.0,spark 3.5.1的这部分api已经发生了一些变化,我是直接下载源码修改pom依赖的spark版本为3.5.1之后修改了一两处代码之后就可以运行通了

请问你是只对seatunnel-translation-spark-3.3这里面使用的spark版本进行了替换呢?还是对全局的spark版本进行了替换呢?

WZH-hub commented 1 month ago

我在集成seatunnel和spark3.5.1的时候也出现了这个问题,seatunnel依赖的版本是spark 3.3.0,spark 3.5.1的这部分api已经发生了一些变化,我是直接下载源码修改pom依赖的spark版本为3.5.1之后修改了一两处代码之后就可以运行通了

请问你是只对seatunnel-translation-spark-3.3这里面使用的spark版本进行了替换呢?还是对全局的spark版本进行了替换呢?

目前是只对seatunnel-translation-spark-3.3的pom里面spark版本为3.5.0,成功