apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.05k stars 1.83k forks source link

[Bug] [MySQL-CDC to Doris] 流式任务状态显示失败但是任务仍在运行,也无法手工停止 #8065

Open exitNA opened 6 days ago

exitNA commented 6 days ago

Search before asking

What happened

使用zeta引擎分离模式部署一个master,一个worker节点,使用mysqlcdc同步数据到doris中,运行一段时间后查看任务状态显示FAILED。于是手工恢复任务,但是一会任务又显示失败,但是查看doris发现仍有数据写入,查看失败任务的日志,发现任务日志仍在在更新。

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  job.name = "stream-test"
  job.mode = "STREAMING"
  checkpoint.interval = 60000
  checkpoint.timeout = 60000
  parallelism = 4
  read_limit.rows_per_second=100
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/testdb"
    username = "test_user"
    password = "test_pwd"
    snapshot.fetch.size = 200
    connection.pool.size = 4
    startup.mode = "initial"
    database-names = [
      "testdb"
    ]
    table-names = [
      "testdb.t1",
      "testdb.t2",
      "testdb.t3",
      "testdb.t4"
    ]
    debezium {
      include.schema.changes = true
    }
  }
}

sink {
  Doris {
    fenodes = "127.0.0.1:8030"
    query-port = 9030
    username = "test_user"
    password = "test_pwd"
    database = "${database_name}"
    table = "${table_name}"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    save_mode_create_template = """
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
  ${rowtype_primary_key},
  ${rowtype_fields},
  sync_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
  "replication_allocation" = "tag.location.default: 3",
  "in_memory" = "false",
  "storage_format" = "V2",
  "disable_auto_compaction" = "false"
)
"""
  }
}

Running Command

./bin/seatunnel.sh -c job/stream-test.config -r 909634676551843842 --async

Error Exception

org.apache.seatunnel.engine.common.exception.TaskGroupDeployException: java.lang.RuntimeException: TaskGroupLocation: TaskGroupLocation{jobId=909634676551843842, pipelineId=1, taskGroupId=1} already exists
    at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:337)
    at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:268)
    at org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation.runInternal(DeployTaskOperation.java:55)
    at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44)
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
    at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)

    at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:574)
    at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:403)
    at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:317)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$19(SubPlan.java:643)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:639)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:633)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:624)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:608)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.restorePipeline(SubPlan.java:496)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:676)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
    at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
    at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Zeta or Flink or Spark Version

zeta: 2.3.8

Java or Scala Version

jdk:1.8.0_111

Screenshots

No response

Are you willing to submit PR?

Code of Conduct