apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.57k stars 1.66k forks source link

[Bug] Doris data synchronization error #6909

Open zxc2817427 opened 1 month ago

zxc2817427 commented 1 month ago

Search before asking

What happened

When synchronizing from Oracle to Doris, errors always occur when synchronizing to 37 million. I tried three times and the same error occurred

SeaTunnel Version

dev

SeaTunnel Config

env {
  parallelism = 10
  job.mode = "BATCH"
  job.name = "全量同步test"
}

source {
  Jdbc {
   url = "jdbc:oracle:thin:@0.0.0.0:1521/test"
   driver = "oracle.jdbc.OracleDriver"
   user = 
   password = 
   query = "select * from test"
   split.size = 200000
   }
}

sink {
  Doris {
    fenodes = ""
    username = 
        password = ""
    database = "doris_db"
    table = "fkdb"
    sink.label-prefix = "fkdb_prefix"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
        format = "json"
        read_json_by_line = "true"
    }
  }
}

Running Command

sh seatunnel.sh --config ../config/doris.conf

Error Exception

java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:62) ~[seatunnel-transforms-v2.jar:2.3.5]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_201]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_201]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_201]
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_201]
    at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403) ~[?:1.8.0_201]
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55) ~[?:?]
    ... 9 more
2024-05-24 17:45:19,740 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=846314587102904321, pipelineId=1, taskGroupId=30002}] - [localhost]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@daa53bb
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) ~[seatunnel-starter.jar:2.3.5]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_201]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_201]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:121) ~[seatunnel-transforms-v2.jar:2.3.5]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:158) ~[seatunnel-transforms-v2.jar:2.3.5]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:43) ~[seatunnel-transforms-v2.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:252) ~[seatunnel-starter.jar:2.3.5]
    ... 16 more
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:62) ~[seatunnel-transforms-v2.jar:2.3.5]
    ... 5 more
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_201]
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_201]
    at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403) ~[?:1.8.0_201]
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:62) ~[seatunnel-transforms-v2.jar:2.3.5]
    ... 5 more
2024-05-24 17:45:19,740 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=846314587102904321, pipelineId=1, taskGroupId=30002}] - [localhost]:5801 [seatunnel] [5.1] taskDone, taskId = 50002, taskGroup = TaskGroupLocation{jobId=846314587102904321, pipelineId=1, taskGroupId=30002}
2024-05-24 17:45:19,742 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=846314587102904321, pipelineId=1, taskGroupId=30002}] - [localhost]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@7d131e49
java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:41) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:46) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:28) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:204) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:119) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader.pollNext(JdbcSourceReader.java:70) ~[?:?]
    at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) ~[seatunnel-starter.jar:2.3.5]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_201]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_201]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220) ~[?:1.8.0_201]
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) ~[?:1.8.0_201]
    at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:350) ~[?:1.8.0_201]
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:39) ~[seatunnel-starter.jar:2.3.5]
    ... 16 more
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
    at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:121)
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:158)
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:43)
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:252)
    ... 16 more
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57)
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194)
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135)
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52)
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:62)
    ... 5 more
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
    at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
    at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55)
    ... 9 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8.0_201

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

luckyliush commented 1 month ago

Which version of seatunnel are you using?

zxc2817427 commented 1 month ago

您使用的是哪个版本的 seatunnel? 2.3.5

github-actions[bot] commented 3 days ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.