apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.88k stars 4.63k forks source link

Running DS 3.2.1 and Spark 3.5.1 SQL failed #16409

Open haoranchuixue opened 3 months ago

haoranchuixue commented 3 months ago

Search before asking

What happened

After integrating Spark 3.5.1 with Dolphinscheduler 3.2.1, executing Spark SQL tasks keeps reporting errors !! However, Dolphin scheduler 3.1.9 integrated with Spark 3.5.1 does not have this issue

---- Error Log

[INFO] 2024-08-01 17:44:00.990 +0800 - ***

[INFO] 2024-08-01 17:44:00.993 +0800 - * Initialize task context ***

[INFO] 2024-08-01 17:44:00.993 +0800 - ***

[INFO] 2024-08-01 17:44:00.994 +0800 - Begin to initialize task

[INFO] 2024-08-01 17:44:00.994 +0800 - Set task startTime: 1722505440994

[INFO] 2024-08-01 17:44:00.994 +0800 - Set task appId: 1_2

[INFO] 2024-08-01 17:44:00.995 +0800 - End initialize task {

"taskInstanceId" : 2,

"taskName" : "t",

"firstSubmitTime" : 1722505440974,

"startTime" : 1722505440994,

"taskType" : "SPARK",

"workflowInstanceHost" : "172.17.16.214:5678",

"host" : "172.17.16.214:1234",

"logPath" : "/opt/whale/dolphinscheduler/worker-server/logs/20240801/14472195556640/2/1/2.log",

"processId" : 0,

"processDefineCode" : 14472195556640,

"processDefineVersion" : 2,

"processInstanceId" : 1,

"scheduleTime" : 0,

"executorId" : 2,

"cmdTypeIfComplement" : 7,

"tenantCode" : "prod",

"processDefineId" : 0,

"projectId" : 0,

"projectCode" : 14472188722592,

"taskParams" : "{\"localParams\":[],\"rawScript\":\"INSERT INTO lake_landing.test.t2 VALUES \r\n(5, '{\\"user\\":\\"chuixue\\", \\"city\\":\\"beijing\\"}'),\r\n(6, '{\\"user\\":\\"ningque\\", \\"city\\":\\"datang\\"}')\r\n;\",\"resourceList\":[],\"programType\":\"SQL\",\"mainClass\":\"\",\"deployMode\":\"client\",\"yarnQueue\":\"\",\"driverCores\":1,\"driverMemory\":\"512M\",\"numExecutors\":2,\"executorMemory\":\"2G\",\"executorCores\":2,\"sqlExecutionType\":\"SCRIPT\"}",

"prepareParamsMap" : {

"system.task.definition.name" : {

  "prop" : "system.task.definition.name",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "t"

},

"system.project.name" : {

  "prop" : "system.project.name",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : null

},

"system.project.code" : {

  "prop" : "system.project.code",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "14472188722592"

},

"system.workflow.instance.id" : {

  "prop" : "system.workflow.instance.id",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "1"

},

"system.biz.curdate" : {

  "prop" : "system.biz.curdate",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "20240801"

},

"system.biz.date" : {

  "prop" : "system.biz.date",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "20240731"

},

"system.task.instance.id" : {

  "prop" : "system.task.instance.id",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "2"

},

"system.workflow.definition.name" : {

  "prop" : "system.workflow.definition.name",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "wf-t"

},

"system.task.definition.code" : {

  "prop" : "system.task.definition.code",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "14472189546528"

},

"system.workflow.definition.code" : {

  "prop" : "system.workflow.definition.code",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "14472195556640"

},

"system.datetime" : {

  "prop" : "system.datetime",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "20240801174400"

}

},

"taskAppId" : "1_2",

"taskTimeout" : 2147483647,

"workerGroup" : "default",

"delayTime" : 0,

"currentExecutionStatus" : "SUBMITTED_SUCCESS",

"endTime" : 0,

"dryRun" : 0,

"paramsMap" : { },

"cpuQuota" : -1,

"memoryMax" : -1,

"testFlag" : 0,

"logBufferEnable" : false,

"dispatchFailTimes" : 0

}

[INFO] 2024-08-01 17:44:00.996 +0800 - ***

[INFO] 2024-08-01 17:44:00.996 +0800 - Load task instance plugin

[INFO] 2024-08-01 17:44:00.996 +0800 - ***

[INFO] 2024-08-01 17:44:01.021 +0800 - Send task status RUNNING_EXECUTION master: 172.17.16.214:1234

[INFO] 2024-08-01 17:44:01.022 +0800 - TenantCode: prod check successfully

[INFO] 2024-08-01 17:44:01.023 +0800 - WorkflowInstanceExecDir: /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2 check successfully

[INFO] 2024-08-01 17:44:01.023 +0800 - Create TaskChannel: org.apache.dolphinscheduler.plugin.task.spark.SparkTaskChannel successfully

[INFO] 2024-08-01 17:44:01.023 +0800 - Download resources successfully:

ResourceContext(resourceItemMap={})

[INFO] 2024-08-01 17:44:01.024 +0800 - Download upstream files: [] successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - Task plugin instance: SPARK create successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - Initialize spark task params {

"localParams" : [ ],

"varPool" : null,

"mainJar" : null,

"mainClass" : "",

"deployMode" : "client",

"mainArgs" : null,

"driverCores" : 1,

"driverMemory" : "512M",

"numExecutors" : 2,

"executorCores" : 2,

"executorMemory" : "2G",

"appName" : null,

"yarnQueue" : "",

"others" : null,

"programType" : "SQL",

"rawScript" : "INSERT INTO lake_landing.test.t2 VALUES \r\n(5, '{\"user\":\"chuixue\", \"city\":\"beijing\"}'),\r\n(6, '{\"user\":\"ningque\", \"city\":\"datang\"}')\r\n;",

"namespace" : null,

"resourceList" : [ ],

"sqlExecutionType" : "SCRIPT"

}

[INFO] 2024-08-01 17:44:01.024 +0800 - Success initialized task plugin instance successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - Set taskVarPool: null successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - ***

[INFO] 2024-08-01 17:44:01.024 +0800 - * Execute task instance *****

[INFO] 2024-08-01 17:44:01.024 +0800 - ***

[INFO] 2024-08-01 17:44:01.025 +0800 - raw script : INSERT INTO lake_landing.test.t2 VALUES

(5, '{"user":"chuixue", "city":"beijing"}'),

(6, '{"user":"ningque", "city":"datang"}')

;

[INFO] 2024-08-01 17:44:01.025 +0800 - task execute path : /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2

[INFO] 2024-08-01 17:44:01.026 +0800 - Final Shell file is:

[INFO] 2024-08-01 17:44:01.026 +0800 - ** Script Content *****

[INFO] 2024-08-01 17:44:01.026 +0800 - #!/bin/bash

BASEDIR=$(cd dirname $0; pwd)

cd $BASEDIR

${SPARK_HOME}/bin/spark-sql --master yarn --deploy-mode client --conf spark.driver.cores=1 --conf spark.driver.memory=512M --conf spark.executor.instances=2 --conf spark.executor.cores=2 --conf spark.executor.memory=2G -f /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2/1_2_node.sql

[INFO] 2024-08-01 17:44:01.026 +0800 - ** Script Content *****

[INFO] 2024-08-01 17:44:01.026 +0800 - Executing shell command : sudo -u prod -i /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2/1_2.sh

[INFO] 2024-08-01 17:44:01.033 +0800 - process start, process id is: 2704

[INFO] 2024-08-01 17:44:04.033 +0800 - ->

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/usr/bigtop/3.2.0/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/bigtop/3.2.0/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

24/08/01 17:44:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[INFO] 2024-08-01 17:44:05.035 +0800 - ->

24/08/01 17:44:04 WARN HiveConf: HiveConf of name hive.load.data.owner does not exist

[INFO] 2024-08-01 17:44:06.035 +0800 - ->

24/08/01 17:44:05 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

[INFO] 2024-08-01 17:44:12.036 +0800 - ->

24/08/01 17:44:11 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.

java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -2184441956866814275, local class serialVersionUID = -3992716321891270988

    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)

    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)

    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)

    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)

    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)

    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)

    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)

    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)

    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)

    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)

    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)

    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)

    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:299)

    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:352)

    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:298)

    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:298)

    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:646)

    at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)

    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689)

    at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)

    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)

    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)

    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)

    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)

    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)

    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)

    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)

    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)

    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)

    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

    at java.lang.Thread.run(Thread.java:748)

[INFO] 2024-08-01 17:44:13.037 +0800 - ->

24/08/01 17:44:12 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!

[INFO] 2024-08-01 17:44:37.039 +0800 - ->

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

[INFO] 2024-08-01 17:44:38.040 +0800 - ->

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.heapsize does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.hook.proto.base-directory does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.thrift.support.proxyuser does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.strict.managed.tables does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.stats.fetch.partition.stats does not exist

24/08/01 17:44:37 WARN HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic

Hive Session ID = d50e7ee4-ddb5-4fa1-816a-841f17b8c7b0

[INFO] 2024-08-01 17:44:39.040 +0800 - ->

Spark master: yarn, Application Id: application_1722383649870_0024

[INFO] 2024-08-01 17:44:40.041 +0800 - ->

24/08/01 17:44:39 WARN HiveConf: HiveConf of name hive.load.data.owner does not exist

Error in query: spark_catalog requires a single-part namespace, but got [lake_landing, test]

24/08/01 17:44:39 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!

[INFO] 2024-08-01 17:44:40.041 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2, processId:2704 ,exitStatusCode:1 ,processWaitForStatus:true ,processExitValue:1

[INFO] 2024-08-01 17:44:40.042 +0800 - Start finding appId in /opt/whale/dolphinscheduler/worker-server/logs/20240801/14472195556640/2/1/2.log, fetch way: log

[INFO] 2024-08-01 17:44:40.042 +0800 - Find appId: application_1722383649870_0024 from /opt/whale/dolphinscheduler/worker-server/logs/20240801/14472195556640/2/1/2.log

[INFO] 2024-08-01 17:44:40.043 +0800 - ***

[INFO] 2024-08-01 17:44:40.043 +0800 - ***** Finalize task instance ****

[INFO] 2024-08-01 17:44:40.043 +0800 - ***

[INFO] 2024-08-01 17:44:40.043 +0800 - Upload output files: [] successfully

[INFO] 2024-08-01 17:44:40.049 +0800 - Send task execute status: FAILURE to master : 172.17.16.214:1234

[INFO] 2024-08-01 17:44:40.050 +0800 - Remove the current task execute context from worker cache

[INFO] 2024-08-01 17:44:40.050 +0800 - The current execute mode isn't develop mode, will clear the task execute file: /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2

[INFO] 2024-08-01 17:44:40.050 +0800 - Success clear the task execute file: /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2

[INFO] 2024-08-01 17:44:40.051 +0800 - FINALIZE_SESSION

What you expected to happen

Expecting SPARK SQL to function properly and in reverse

How to reproduce

After integrating Spark 3.5.1 (with Paimon 0.8) using Dolphinscheduler 3.1.2, adding Spark SQL tasks and executing the following statements will result in an error.

USE landing_paimon.test; create table t2 ( k int, v string ) tblproperties ( 'primary-key' = 'k' ); INSERT INTO t2 VALUES (1, '{"user":"haoran", "city":"beijing"}'), (2, '{"user":"浩然吹雪", "city":"大秦"}') ;

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 3 months ago

Search before asking

What happened

After integrating Spark 3.5.1 with Dolphinscheduler 3.2.1, executing Spark SQL tasks keeps reporting errors !! However, Dolphin scheduler 3.1.9 integrated with Spark 3.5.1 does not have this issue

---- Error Log

[INFO] 2024-08-01 17:44:00.990 +0800 - ***

[INFO] 2024-08-01 17:44:00.993 +0800 - * Initialize task context ***

[INFO] 2024-08-01 17:44:00.993 +0800 - ***

[INFO] 2024-08-01 17:44:00.994 +0800 - Begin to initialize task

[INFO] 2024-08-01 17:44:00.994 +0800 - Set task startTime: 1722505440994

[INFO] 2024-08-01 17:44:00.994 +0800 - Set task appId: 1_2

[INFO] 2024-08-01 17:44:00.995 +0800 - End initialize task {

"taskInstanceId" : 2,

"taskName" : "t",

"firstSubmitTime" : 1722505440974,

"startTime" : 1722505440994,

"taskType" : "SPARK",

"workflowInstanceHost" : "172.17.16.214:5678",

"host" : "172.17.16.214:1234",

"logPath" : "/opt/whale/dolphinscheduler/worker-server/logs/20240801/14472195556640/2/1/2.log",

"processId" : 0,

"processDefineCode" : 14472195556640,

"processDefineVersion" : 2,

"processInstanceId" : 1,

"scheduleTime" : 0,

"executorId" : 2,

"cmdTypeIfComplement" : 7,

"tenantCode" : "prod",

"processDefineId" : 0,

"projectId" : 0,

"projectCode" : 14472188722592,

"taskParams" : "{\"localParams\":[],\"rawScript\":\"INSERT INTO lake_landing.test.t2 VALUES \r\n(5, '{\\"user\\":\\"chuixue\\", \\"city\\":\\"beijing\\"}'),\r\n(6, '{\\"user\\":\\"ningque\\", \\"city\\":\\"datang\\"}')\r\n;\",\"resourceList\":[],\"programType\":\"SQL\",\"mainClass\":\"\",\"deployMode\":\"client\",\"yarnQueue\":\"\",\"driverCores\":1,\"driverMemory\":\"512M\",\"numExecutors\":2,\"executorMemory\":\"2G\",\"executorCores\":2,\"sqlExecutionType\":\"SCRIPT\"}",

"prepareParamsMap" : {

"system.task.definition.name" : {

  "prop" : "system.task.definition.name",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "t"

},

"system.project.name" : {

  "prop" : "system.project.name",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : null

},

"system.project.code" : {

  "prop" : "system.project.code",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "14472188722592"

},

"system.workflow.instance.id" : {

  "prop" : "system.workflow.instance.id",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "1"

},

"system.biz.curdate" : {

  "prop" : "system.biz.curdate",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "20240801"

},

"system.biz.date" : {

  "prop" : "system.biz.date",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "20240731"

},

"system.task.instance.id" : {

  "prop" : "system.task.instance.id",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "2"

},

"system.workflow.definition.name" : {

  "prop" : "system.workflow.definition.name",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "wf-t"

},

"system.task.definition.code" : {

  "prop" : "system.task.definition.code",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "14472189546528"

},

"system.workflow.definition.code" : {

  "prop" : "system.workflow.definition.code",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "14472195556640"

},

"system.datetime" : {

  "prop" : "system.datetime",

  "direct" : "IN",

  "type" : "VARCHAR",

  "value" : "20240801174400"

}

},

"taskAppId" : "1_2",

"taskTimeout" : 2147483647,

"workerGroup" : "default",

"delayTime" : 0,

"currentExecutionStatus" : "SUBMITTED_SUCCESS",

"endTime" : 0,

"dryRun" : 0,

"paramsMap" : { },

"cpuQuota" : -1,

"memoryMax" : -1,

"testFlag" : 0,

"logBufferEnable" : false,

"dispatchFailTimes" : 0

}

[INFO] 2024-08-01 17:44:00.996 +0800 - ***

[INFO] 2024-08-01 17:44:00.996 +0800 - Load task instance plugin

[INFO] 2024-08-01 17:44:00.996 +0800 - ***

[INFO] 2024-08-01 17:44:01.021 +0800 - Send task status RUNNING_EXECUTION master: 172.17.16.214:1234

[INFO] 2024-08-01 17:44:01.022 +0800 - TenantCode: prod check successfully

[INFO] 2024-08-01 17:44:01.023 +0800 - WorkflowInstanceExecDir: /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2 check successfully

[INFO] 2024-08-01 17:44:01.023 +0800 - Create TaskChannel: org.apache.dolphinscheduler.plugin.task.spark.SparkTaskChannel successfully

[INFO] 2024-08-01 17:44:01.023 +0800 - Download resources successfully:

ResourceContext(resourceItemMap={})

[INFO] 2024-08-01 17:44:01.024 +0800 - Download upstream files: [] successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - Task plugin instance: SPARK create successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - Initialize spark task params {

"localParams" : [ ],

"varPool" : null,

"mainJar" : null,

"mainClass" : "",

"deployMode" : "client",

"mainArgs" : null,

"driverCores" : 1,

"driverMemory" : "512M",

"numExecutors" : 2,

"executorCores" : 2,

"executorMemory" : "2G",

"appName" : null,

"yarnQueue" : "",

"others" : null,

"programType" : "SQL",

"rawScript" : "INSERT INTO lake_landing.test.t2 VALUES \r\n(5, '{\"user\":\"chuixue\", \"city\":\"beijing\"}'),\r\n(6, '{\"user\":\"ningque\", \"city\":\"datang\"}')\r\n;",

"namespace" : null,

"resourceList" : [ ],

"sqlExecutionType" : "SCRIPT"

}

[INFO] 2024-08-01 17:44:01.024 +0800 - Success initialized task plugin instance successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - Set taskVarPool: null successfully

[INFO] 2024-08-01 17:44:01.024 +0800 - ***

[INFO] 2024-08-01 17:44:01.024 +0800 - * Execute task instance *****

[INFO] 2024-08-01 17:44:01.024 +0800 - ***

[INFO] 2024-08-01 17:44:01.025 +0800 - raw script : INSERT INTO lake_landing.test.t2 VALUES

(5, '{"user":"chuixue", "city":"beijing"}'),

(6, '{"user":"ningque", "city":"datang"}')

;

[INFO] 2024-08-01 17:44:01.025 +0800 - task execute path : /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2

[INFO] 2024-08-01 17:44:01.026 +0800 - Final Shell file is:

[INFO] 2024-08-01 17:44:01.026 +0800 - ** Script Content *****

[INFO] 2024-08-01 17:44:01.026 +0800 - #!/bin/bash

BASEDIR=$(cd dirname $0; pwd)

cd $BASEDIR

${SPARK_HOME}/bin/spark-sql --master yarn --deploy-mode client --conf spark.driver.cores=1 --conf spark.driver.memory=512M --conf spark.executor.instances=2 --conf spark.executor.cores=2 --conf spark.executor.memory=2G -f /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2/1_2_node.sql

[INFO] 2024-08-01 17:44:01.026 +0800 - ** Script Content *****

[INFO] 2024-08-01 17:44:01.026 +0800 - Executing shell command : sudo -u prod -i /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2/1_2.sh

[INFO] 2024-08-01 17:44:01.033 +0800 - process start, process id is: 2704

[INFO] 2024-08-01 17:44:04.033 +0800 - ->

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/usr/bigtop/3.2.0/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/bigtop/3.2.0/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

24/08/01 17:44:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[INFO] 2024-08-01 17:44:05.035 +0800 - ->

24/08/01 17:44:04 WARN HiveConf: HiveConf of name hive.load.data.owner does not exist

[INFO] 2024-08-01 17:44:06.035 +0800 - ->

24/08/01 17:44:05 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

[INFO] 2024-08-01 17:44:12.036 +0800 - ->

24/08/01 17:44:11 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.

java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -2184441956866814275, local class serialVersionUID = -3992716321891270988

    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)

    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)

    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)

    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)

    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)

    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)

    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)

    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)

    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)

    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)

    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)

    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)

    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:299)

    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:352)

    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:298)

    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

    at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:298)

    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:646)

    at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)

    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689)

    at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)

    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)

    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)

    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)

    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)

    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)

    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)

    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)

    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)

    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)

    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

    at java.lang.Thread.run(Thread.java:748)

[INFO] 2024-08-01 17:44:13.037 +0800 - ->

24/08/01 17:44:12 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!

[INFO] 2024-08-01 17:44:37.039 +0800 - ->

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

24/08/01 17:44:36 WARN SQLConf: The SQL config 'spark.sql.adaptive.coalescePartitions.minPartitionNum' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.adaptive.coalescePartitions.minPartitionSize' instead.

[INFO] 2024-08-01 17:44:38.040 +0800 - ->

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.heapsize does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.hook.proto.base-directory does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.thrift.support.proxyuser does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.strict.managed.tables does not exist

24/08/01 17:44:37 WARN HiveConf: HiveConf of name hive.stats.fetch.partition.stats does not exist

24/08/01 17:44:37 WARN HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic

Hive Session ID = d50e7ee4-ddb5-4fa1-816a-841f17b8c7b0

[INFO] 2024-08-01 17:44:39.040 +0800 - ->

Spark master: yarn, Application Id: application_1722383649870_0024

[INFO] 2024-08-01 17:44:40.041 +0800 - ->

24/08/01 17:44:39 WARN HiveConf: HiveConf of name hive.load.data.owner does not exist

Error in query: spark_catalog requires a single-part namespace, but got [lake_landing, test]

24/08/01 17:44:39 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!

[INFO] 2024-08-01 17:44:40.041 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2, processId:2704 ,exitStatusCode:1 ,processWaitForStatus:true ,processExitValue:1

[INFO] 2024-08-01 17:44:40.042 +0800 - Start finding appId in /opt/whale/dolphinscheduler/worker-server/logs/20240801/14472195556640/2/1/2.log, fetch way: log

[INFO] 2024-08-01 17:44:40.042 +0800 - Find appId: application_1722383649870_0024 from /opt/whale/dolphinscheduler/worker-server/logs/20240801/14472195556640/2/1/2.log

[INFO] 2024-08-01 17:44:40.043 +0800 - ***

[INFO] 2024-08-01 17:44:40.043 +0800 - ***** Finalize task instance ****

[INFO] 2024-08-01 17:44:40.043 +0800 - ***

[INFO] 2024-08-01 17:44:40.043 +0800 - Upload output files: [] successfully

[INFO] 2024-08-01 17:44:40.049 +0800 - Send task execute status: FAILURE to master : 172.17.16.214:1234

[INFO] 2024-08-01 17:44:40.050 +0800 - Remove the current task execute context from worker cache

[INFO] 2024-08-01 17:44:40.050 +0800 - The current execute mode isn't develop mode, will clear the task execute file: /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2

[INFO] 2024-08-01 17:44:40.050 +0800 - Success clear the task execute file: /tmp/dolphinscheduler/exec/process/prod/14472188722592/14472195556640_2/1/2

[INFO] 2024-08-01 17:44:40.051 +0800 - FINALIZE_SESSION

What you expected to happen

Expecting SPARK SQL to function properly and in reverse

How to reproduce

After integrating Spark 3.5.1 (with Paimon 0.8) using Dolphinscheduler 3.1.2, adding Spark SQL tasks and executing the following statements will result in an error.

USE landing_paimon.test; create table t2 ( k int, v string ) tblproperties ( 'primary-key' = 'k' ); INSERT INTO t2 VALUES (1, '{"user":"haoran", "city":"beijing"}'), (2, '{"user":"浩然吹雪", "city":"大秦"}') ;

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

Code of Conduct