apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.96k stars 1.8k forks source link

Mysql-CDC -> Doris Sink [BUG] #7050

Closed BigdataSurvey closed 2 months ago

BigdataSurvey commented 4 months ago

Search before asking

What happened

从Mysql 抽取数据到Doris。会有两种情况:

情况1:刚开始前两分钟正常,后面日志报错: Label already error。可是我每次启动的时候都会去修改"*.conf.template" 配置文件中"sink.label-prefix" 项来保证每次启动唯一,但是运行几分钟之后还是报错。经过进一步测试, 发现在配置文件中"Mysql Source"中配置了多表, 导致再多个表数据同一时间修改后通过Doris传输报错 Label already error,只配置一张表是没有问题的,但是业务场景需要将很多分表写到一个doris表中。请问这个问题有解决方案吗?

情况2:任务在执行过程中被中断 (InterruptedException) 线程在等待某些资源或条件时被其他线程中断,但是服务器配置和网络通信都没有压力目前。

服务配置:Seatunnel集群共4台服务器,资源环境良好, 版本是2.3.5 版本。Doris版本为2.1.2


Extract data from Mysql to Doris. There are two kinds of errors:

Situation 1: The log is normal for the first two minutes. Later, a Label already error is reported. However, I would modify the "sink.label-prefix" item in the "*.conf.template" configuration file every time I started to ensure that the startup was unique, but an error was still reported after running for several minutes. After further testing, it was found that multiple tables were configured in the "Mysql Source" configuration file. After the data of multiple tables is modified at the same time, a Label already error is reported through the Doris transmission. It is OK to configure only one table, but many sub-tables need to be written to one doris table in a service scenario. Is there a solution to this problem?

Situation 2: A task is interrupted during execution (InterruptedException) A thread is interrupted by another thread while waiting for some resource or condition, but neither server configuration nor network communication is currently under stress.

Service configuration: There are four servers in the Seatunnel cluster, the resource environment is good, and the version is 2.3.5. The Doris version is 2.1.2

SeaTunnel Version

SeaTunnel Version: 2.3.5 Doris Version: 2.1.2

SeaTunnel Config

【./config/v2.gxy.streaming.conf.template】
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://localhost:3306/practice?useSSL=false"
    username = "123"
    password = "123"
    table-names = [
      "practice.gxy_student_0",
      "practice.gxy_student_1",
      "practice.gxy_student_2",
      "practice.gxy_student_3",
      "practice.gxy_student_4"
    ]
    startup.mode = "latest"
  }
}

sink {
  Doris {
    fenodes = "localhost:8030"
    username = "123"
    password = "123"
    database = "tis_gxy"
    table = "gxy_student"
    table.identifier = "tis_gxy.gxy_student"
    sink.label-prefix = "label_92e1051f8765498e_0624_1016"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

【./config/hazelcast.yaml】

hazelcast:
  cluster-name: seatunnel
  network:
    rest-api:
      enabled: true
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true
    join:
      tcp-ip:
        enabled: true
        member-list:
          - 10.123.123.121
          - 10.123.123.122
          - 10.123.123.123
          - 10.123.123.124

    port:
      auto-increment: false
      port: 5801
  map:
    engine-map:
       map-store:
         enabled: true
         initial-mode: EAGER
         factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
         properties:
           type: hdfs
           namespace: /tmp/seatunnel/imap
           clusterName: seatunnel-cluster
           storage.type: oss
           block.size: 10240
           oss.bucket: oss://mogudb/
           fs.oss.accessKeyId: **
           fs.oss.accessKeySecret: **
           fs.oss.endpoint: cn-qingdao-internal.oss-accesspoint.aliyuncs.com
           #fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
  properties:
    hazelcast.invocation.max.retry.count: 20
    hazelcast.tcp.join.port.try.count: 30

Running Command

sh /data/seatunnel235/bin/seatunnel.sh --config ./config/v2.gxy.streaming.conf.template

Error Exception

Error in the first case:
org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [LABEL_ALREADY_EXISTS]Status : errCode = 2, detailMessage = Label [test-cdc_tis_gxy_gxy_student_856834878388305923_0_8] has already been used, relate to txn [5981062]
2024-06-24 10:43:24,275 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] [hz.main.seaTunnel.task.thread-31] - abort 5992921 for check label label_92e1051f8765498e_0624_1016_tis_gxy_gxy_student_857450112475987971_0_13.
2024-06-24 10:43:24,284 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] [hz.main.seaTunnel.task.thread-31] - abort for labelSuffix label_92e1051f8765498e_0624_1016_tis_gxy_gxy_student_857450112475987971_0 finished
2024-06-24 10:43:24,284 WARN  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-31] - [10.29.86.93]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@4812e351
java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) ~[seatunnel-starter.jar:2.3.5]
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) ~[seatunnel-starter.jar:2.3.5]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_91]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_91]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_91]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_91]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) ~[seatunnel-transforms-v2.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
    ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_91]
    at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_91]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ~[seatunnel-transforms-v2.jar:2.3.5]
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
    ... 16 more
Caused by: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:220) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:159) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:145) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
    ... 5 more
Caused by: java.io.IOException: Stream closed
    at java.util.zip.GZIPInputStream.ensureOpen(GZIPInputStream.java:62) ~[?:1.8.0_91]
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:113) ~[?:1.8.0_91]
    at org.apache.http.client.entity.LazyDecompressingInputStream.read(LazyDecompressingInputStream.java:70) ~[seatunnel-hadoop3-3.3.6-uber.jar:2.7-WS-test-SNAPSHOT]
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[?:1.8.0_91]
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[?:1.8.0_91]
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[?:1.8.0_91]
    at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[?:1.8.0_91]
    at java.io.Reader.read(Reader.java:140) ~[?:1.8.0_91]
    at org.apache.http.util.EntityUtils.toString(EntityUtils.java:247) ~[seatunnel-hadoop3-3.3.6-uber.jar:2.7-WS-test-SNAPSHOT]
    at org.apache.http.util.EntityUtils.toString(EntityUtils.java:291) ~[seatunnel-hadoop3-3.3.6-uber.jar:2.7-WS-test-SNAPSHOT]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.handlePreCommitResponse(DorisStreamLoad.java:205) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:218) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:159) ~[?:?]
    at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:145) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]

Error in the second case:
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
    ...
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:121)
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:201)
    ...
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:47)
    ...

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8.0_181

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 2 months ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

windmemory commented 1 month ago

@BigdataSurvey did you resolve this problem?

BigdataSurvey commented 1 month ago

This problem has not been solved yet. Please help

At 2024-09-02 16:26:32, "Yuan Gao" @.***> wrote:

@BigdataSurvey did you resolve this problem?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>