apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.07k stars 1.83k forks source link

rest api:/hazelcast/rest/maps/finished-jobs,The SinkWriteCount returned is inaccurate, Always the same as SourceReceivedCount. #8101

Open houjiakun opened 3 days ago

houjiakun commented 3 days ago

Search before asking

What happened

rest api:/hazelcast/rest/maps/finished-jobs,The SinkWriteCount returned is inaccurate, Always the same as SourceReceivedCount. During testing, some duplicate data was created, The source table has 13 data entries, of which 1 is duplicated. The target table ID serves as the primary key. After executing the seatunnel task, 12 data entries were written to the target table, and the other 1 primary key was duplicated, resulting in a write failure. But the SinkWriteCount returned by the API interface above is 13. How should I set it to ensure that the SinkWriteCount is correct.

SeaTunnel Version

2.3.7

SeaTunnel Config

env {
    job.mode = "BATCH"
    job.name = "data_label"
    job.retry.times=0
    parallelism = 1
}

source {
    Jdbc {
        url= "jdbc:mysql://xxx:3306/test?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useUnicode=true&useSSL=false"
        driver= "com.mysql.jdbc.Driver"
        user = "root"
        password = "xxx"
        result_table_name= "data_label_src"
        query= "SELECT id as id, label_name AS label_name,order_no as order_no,is_del AS is_del,crt_time AS crt_time,crt_user AS crt_user,crt_name AS crt_name,crt_host AS crt_host,upd_time AS upd_time,upd_user AS upd_user,upd_name AS upd_name,upd_host AS upd_host,description AS description FROM data_label"
    }
}

sink {
        Jdbc {
            url= "jdbc:mysql://xxxx:3306/test1?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useUnicode=true&useSSL=false"
            driver= "com.mysql.jdbc.Driver"
            user = "root"
             password = "xxxx"
            source_table_name =  "data_label_src"
            database = "test1"
            schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"
            max_retries=0
            table =  "data_label"
            generate_sink_sql = true
        }
}

Running Command

./bin/seatunnel.sh --config test.config -m cluster

Error Exception

http://xxx:5801/hazelcast/rest/maps/finished-jobs return message:
[
{
        "jobId": "912151384986484738",
        "jobName": "data_label",
        "jobStatus": "FAILED",
        "errorMsg": "java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:292)\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:74)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)\n\tat org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)\n\tat org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)\n\tat org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)\n\tat org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)\n\tat org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:208)\n\t... 16 more\nCaused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat java.util.concurrent.FutureTask.report(FutureTask.java:122)\n\tat java.util.concurrent.FutureTask.get(FutureTask.java:192)\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232)\n\t... 17 more\nCaused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:142)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcExactlyOnceSinkWriter.prepareCurrentTx(JdbcExactlyOnceSinkWriter.java:202)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcExactlyOnceSinkWriter.prepareCommit(JdbcExactlyOnceSinkWriter.java:144)\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)\n\t... 5 more\nCaused by: java.sql.BatchUpdateException: Duplicate entry '3' for key 'PRIMARY'\n\tat sun.reflect.GeneratedConstructorAccessor55.newInstance(Unknown Source)\n\tat sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\n\tat java.lang.reflect.Constructor.newInstance(Constructor.java:423)\n\tat com.mysql.cj.util.Util.handleNewInstance(Util.java:192)\n\tat com.mysql.cj.util.Util.getInstance(Util.java:167)\n\tat com.mysql.cj.util.Util.getInstance(Util.java:174)\n\tat com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)\n\tat com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:800)\n\tat com.mysql.cj.jdbc.StatementWrapper.executeBatch(StatementWrapper.java:545)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:534)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:53)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:172)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:136)\n\t... 8 more\nCaused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '3' for key 'PRIMARY'\n\tat com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)\n\tat com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1098)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1046)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1371)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:716)\n\t... 16 more\n",
        "createTime": "2024-11-22 09:23:57",
        "finishTime": "2024-11-22 09:23:59",
        "jobDag": "{\"jobId\":912151384986484738,\"pipelineEdges\":{\"1\":[{\"inputVertexId\":1,\"targetVertexId\":2}]},\"vertexInfoMap\":{\"1\":{\"vertexId\":1,\"type\":\"SOURCE\",\"connectorType\":\"pipeline-1 [Source[0]-Jdbc]\"},\"2\":{\"vertexId\":2,\"type\":\"SINK\",\"connectorType\":\"pipeline-1 [Sink[0]-Jdbc-MultiTableSink]\"}}}",
        "pluginJarsUrls": [],
        "metrics": {
            "TableSourceReceivedCount": {
                "default.default.default": "13"
            },
            "SourceReceivedCount": "13",
            "TableSinkWriteCount": {
                "default.default.default": "13"
            },
            "SinkWriteCount": "13"
        }
    }
]
rest api:/hazelcast/rest/maps/finished-jobs,The SinkWriteCount returned is inaccurate, Always the same as SourceReceivedCount.

Zeta or Flink or Spark Version

No response

Java or Scala Version

java-1.8.0_412

Screenshots

No response

Are you willing to submit PR?

Code of Conduct