apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.07k stars 1.83k forks source link

[Bug] [Doris Sink] java.lang.NullPointerException #5193

Closed dongsilun closed 1 year ago

dongsilun commented 1 year ago

Search before asking

What happened

Create a test batch job [hive -> doris]. It looks like there's a Null Pointer Exception at the DorisSinkWriter.

2023-08-01 17:08:30,489 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (myhost executor 1): java.lang.NullPointerException at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.createSerializer(DorisSinkWriter.java:271) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.<init>(DorisSinkWriter.java:94) at org.apache.seatunnel.connectors.doris.sink.DorisSink.createWriter(DorisSink.java:103) at org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriterFactory.createWriter(SeaTunnelSparkDataWriterFactory.java:49) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:407) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

SeaTunnel Version

2.3.2

SeaTunnel Config

env {
  execution.parallelism = 3
  job.mode = "BATCH"
}

source {
  Hive {
    preSql = "select * from hive2doris"
    table_name = "test.hive2doris"
    metastore_uri = "thrift://xxxx:9083"
    hdfs_site_path = /home/op/software/hadoop-3.3.2/etc/hadoop/hdfs-site.xml
  }
}

sink {
    Doris {
        fenodes = "xxxx:8030"
        username = root
        password = ""
        table.identifier = "test.hive2doris"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_row"
        doris.config = {
                        columns="id,str1,str2,str3,str4,str5,str6,str7,str8,str9,str10,num1,num2,num3,num4,num5,num6,num7,num8,num9"
        }
    }
}

Running Command

$SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master yarn --config hive2doris.conf

Error Exception

java.lang.NullPointerException
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.createSerializer(DorisSinkWriter.java:271)
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.<init>(DorisSinkWriter.java:94)
    at org.apache.seatunnel.connectors.doris.sink.DorisSink.createWriter(DorisSink.java:103)
    at org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriterFactory.createWriter(SeaTunnelSparkDataWriterFactory.java:49)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:407)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
...
org.apache.spark.SparkException: Writing job aborted
    at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:613)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:386)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:330)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:236)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:309)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:308)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:236)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:125)
    at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:74)
    at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Zeta or Flink or Spark Version

Spark-3.2.2

Java or Scala Version

jdk1.8

Screenshots

1690883037521

Are you willing to submit PR?

Code of Conduct

dongsilun commented 1 year ago

Hive table

create external table hive2doris (id int, str1 string, str2 string, str3 string , str4 string, str5 string, str6 string, str7 string , str8 string, str9 string, str10 string, num1 int , num2 int, num3 int, num4 int, num5 int, num6 int , num7 int, num8 int, num9 int) stored as orc location '/hive_test/hive2doris';

doris table

CREATE TABLE hive2doris (id int(10) NOT NULL, str1 varchar(100) NOT NULL,str2 varchar(100) NOT NULL,.........,num8 int(10) NOT NULL,num9 int(10) NOT NULL ) ENGINE=OLAP DUPLICATE KEY(id, str1) COMMENT 'OLAP' DISTRIBUTED BY HASH(id) BUCKETS 10 PROPERTIES ( "replication_allocation" = "tag.location.default: 3" );

Carl-Zhou-CN commented 1 year ago

@dongsilun Need to add
doris.config = { format="json" read_json_by_line="true" } the format is mandatory

dongsilun commented 1 year ago

Thank you.