apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.05k stars 1.83k forks source link

[Bug] [REST API] submit-job but sink param schema_save_mode not take effect #8085

Open WeiLi1201 opened 2 days ago

WeiLi1201 commented 2 days ago

Search before asking

What happened

The configuration script I will use can be executed with shell commands. However, when it is converted into the JSON format and used with the submit-job interface, an error occurs. It shows that the table does not exist on the Doris side. But when I use the shell script, the table will be automatically created.

The task can be submitted successfully, but it will fail after one minute, reporting that the Doris table does not exist.

Could you please check if there is something wrong with my configuration file? The examples about the JSON format of the interface in the official documentation are limited.

我将使用的配置脚本,使用 shell 命令可以执行,当转成 json 格式使用submit-job 接口时报错,显示报错 Doris 端不存在表,但我使用 shell 脚本时,是会自动建表的 任务能提交成功,但是过 1分钟就会失败,报错 doris 表不存在 麻烦看看是不是我配置文件写的有问题? 官方文档上关于接口 json格式的示例有限

SeaTunnel Version

2.3.8

SeaTunnel Config

{
  "env": {
      "job.name": "cdc_single_table_to_doris",
      "parallelism": 1,
      "job.mode": "STREAMING",
      "checkpoint.interval": 10000,
      "job.id": 125161485867595251
  },
  "source": [
      {
          "plugin_name": "MySQL-CDC",
          "base-url": "jdbc:mysql://localhost:3306/res_customer",
          "username": "ssssssxxxx",
          "password": "6LR%xxxxxxx",
          "database-names": [
              "res_customer",
              "res_content"
          ],
          "table-names": [
              "res_customer.after_sale_customer_follow_up",
              "res_customer.after_sale_customer_follow_up_records",
              "res_customer.customer_coach_associate",
              "res_content.poster",
              "res_content.marketing_content"
          ],
          "startup.mode": "initial",
          "connect.max-retries": 5,
          "debezium": {
              "include.schema.changes": true
          }
      }
  ],
  "sink": [
      {
          "plugin_name": "Doris",
          "fenodes": "server1:8030,server2:8030,server3:8030,server4:8030",
          "username": "ssssss",
          "password": "sxxxx777",
          "database": "db",
          "schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST",
          "data_save_mode": "APPEND_DATA",
          "table": "${database_name}_${table_name}",
          "sink.max-retries": 5,
          "sink.enable-delete": "true",
          "doris.config": {
              "format": "json",
              "read_json_by_line": "true"
          }
      }
  ]
}

Running Command

/hazelcast/rest/maps/submit-job

Error Exception

{
    "jobId": "125161485867595251",
    "jobName": "cdc_single_table_to_doris",
    "jobStatus": "FAILED",
    "errorMsg": "java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - [NOT_FOUND]TStatus: errCode = 7, detailMessage = unknown table, tableName=res_content_marketing_content\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)\n\tat org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)\n\tat org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)\n\tat org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)\n\tat org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)\n\tat org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)\n\tat org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - [NOT_FOUND]TStatus: errCode = 7, detailMessage = unknown table, tableName=res_content_marketing_content\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:258)\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:188)\n\t... 17 more\nCaused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - [NOT_FOUND]TStatus: errCode = 7, detailMessage = unknown table, tableName=res_content_marketing_content\n\tat java.util.concurrent.FutureTask.report(FutureTask.java:122)\n\tat java.util.concurrent.FutureTask.get(FutureTask.java:192)\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:256)\n\t... 18 more\nCaused by: java.lang.RuntimeException: java.io.IOException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - [NOT_FOUND]TStatus: errCode = 7, detailMessage = unknown table, tableName=res_content_marketing_content\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:243)\n\t... 6 more\nCaused by: java.io.IOException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - [NOT_FOUND]TStatus: errCode = 7, detailMessage = unknown table, tableName=res_content_marketing_content\n\tat org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.stopBufferData(RecordBuffer.java:96)\n\tat org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.endInput(RecordStream.java:41)\n\tat org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:224)\n\tat org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:156)\n\tat org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:143)\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:241)\n\t... 6 more\nCaused by: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - [NOT_FOUND]TStatus: errCode = 7, detailMessage = unknown table, tableName=res_content_marketing_content\n\tat org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.checkErrorMessageByStreamLoad(RecordBuffer.java:143)\n\tat org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.stopBufferData(RecordBuffer.java:88)\n\t... 11 more\n",
    "createTime": "2024-11-19 18:09:07",
    "finishTime": "2024-11-19 18:10:00",
    "jobDag": "{\"jobId\":125161485867595251,\"pipelineEdges\":{\"1\":[{\"inputVertexId\":1,\"targetVertexId\":2}]},\"vertexInfoMap\":{\"1\":{\"vertexId\":1,\"type\":\"SOURCE\",\"connectorType\":\"pipeline-1 [Source[0]-MySQL-CDC]\"},\"2\":{\"vertexId\":2,\"type\":\"SINK\",\"connectorType\":\"pipeline-1 [Sink[0]-Doris-MultiTableSink]\"}}}",
    "pluginJarsUrls": [],
    "metrics": {
        "TableSourceReceivedCount": {
            "res_content.poster": "136",
            "res_content.marketing_content": "12",
            "res_customer.customer_coach_associate": "5828",
            "res_customer.after_sale_customer_follow_up_records": "260",
            "res_customer.after_sale_customer_follow_up": "102900"
        },
        "TableSourceReceivedQPS": {
            "res_content.poster": "2.5955684485753",
            "res_content.marketing_content": "0.22901637467078897",
            "res_customer.customer_coach_associate": "111.22774204630036",
            "res_customer.after_sale_customer_follow_up_records": "4.962021451200427",
            "res_customer.after_sale_customer_follow_up": "1963.8154128020153"
        },
        "TableSinkWriteQPS": {
            "res_content.poster": "2.592649077321946",
            "res_content.marketing_content": "0.22876315388134819",
            "res_customer.customer_coach_associate": "111.10263840170809",
            "res_customer.after_sale_customer_follow_up_records": "4.9565350007625435",
            "res_customer.after_sale_customer_follow_up": "1961.6440445325607"
        },
        "TableSinkWriteCount": {
            "res_content.poster": "136",
            "res_content.marketing_content": "12",
            "res_customer.customer_coach_associate": "5828",
            "res_customer.after_sale_customer_follow_up_records": "260",
            "res_customer.after_sale_customer_follow_up": "102900"
        },
        "SinkWriteCount": "109136",
        "SinkWriteBytesPerSeconds": "374443.11422906816",
        "TableSinkWriteBytesPerSeconds": {
            "res_content.poster": "650.5261552539271",
            "res_content.marketing_content": "37.36464846728687",
            "res_customer.customer_coach_associate": "19550.937928930914",
            "res_customer.after_sale_customer_follow_up_records": "1053.911849931371",
            "res_customer.after_sale_customer_follow_up": "353150.3736464847"
        },
        "SinkWriteQPS": "2080.5246301662346",
        "SourceReceivedBytes": "19641788",
        "SourceReceivedBytesPerSeconds": "374857.58998435055",
        "SourceReceivedCount": "109136",
        "SourceReceivedQPS": "2082.8275888392686",
        "TableSourceReceivedBytesPerSeconds": {
            "res_content.poster": "651.2462307721669",
            "res_content.marketing_content": "37.40600786289553",
            "res_customer.customer_coach_associate": "19572.952649960876",
            "res_customer.after_sale_customer_follow_up_records": "1055.0784381083247",
            "res_customer.after_sale_customer_follow_up": "353541.2802015344"
        },
        "TableSourceReceivedBytes": {
            "res_content.poster": "34124",
            "res_content.marketing_content": "1960",
            "res_customer.customer_coach_associate": "1025564",
            "res_customer.after_sale_customer_follow_up_records": "55284",
            "res_customer.after_sale_customer_follow_up": "18524856"
        },
        "SinkWriteBytes": "19641788",
        "TableSinkWriteBytes": {
            "res_content.poster": "34124",
            "res_content.marketing_content": "1960",
            "res_customer.customer_coach_associate": "1025564",
            "res_customer.after_sale_customer_follow_up_records": "55284",
            "res_customer.after_sale_customer_follow_up": "18524856"
        }
    }
}

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

1.8

Screenshots

image

Are you willing to submit PR?

Code of Conduct