DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Bug] [StarRocks] streamLoad报错502时,统计指标错误统计了未成功写入的记录数 #1911

Closed libailin closed 1 month ago

libailin commented 1 month ago

Search before asking

What happened

sql模式 ,starrocks写入时,当stream load 报错502错误,整个任务仍然是执行成功的,写入统计指标也是错误的统计了未真正写入成功的记录数,其实数据库里并未写入数据,但是统计指标却统计了记录。

What you expected to happen

024-07-25 04:01:08,540 INFO  com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadVisitor - Start to join batch data: label[chunjun_connector_20240725_040107_460e1ad9b3714d2a807d19dd722a678f].
2024-07-25 04:01:08,541 INFO  com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadVisitor - Executing stream load to: 'http://127.0.0.1:8031/api/xxx/xxx/_stream_load', size: '2357267', thread: 79
2024-07-25 04:01:08,552 WARN  com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadVisitor - Request failed with code:502
2024-07-25 04:01:08,552 WARN  com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager - Failed to flush batch data to StarRocks, retry times = 1
java.io.IOException: Unable to flush data to StarRocks: unknown result status, usually caused by: 1.authorization or permission related problems. 2.Wrong column_separator or row_delimiter. 3.Column count exceeded the limitation.
    at com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadVisitor.dealStreamLoadResult(StarRocksStreamLoadVisitor.java:98) ~[chunjun-connector-starrocks-master.jar:?]
    at com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:90) ~[chunjun-connector-starrocks-master.jar:?]
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager.asyncFlush(StreamLoadManager.java:278) ~[chunjun-connector-starrocks-master.jar:?]
    at com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager.lambda$startAsyncFlushing$3(StreamLoadManager.java:237) ~[chunjun-connector-starrocks-master.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

How to reproduce

模拟 stream load url 报 502,

首先可以启动一个spring-boot RestController

@RequestMapping("/api/databaseName/tableName/_stream_load")
    public ResponseEntity<String> api(@RequestHeader Map<String, String> headers, @RequestBody byte[] data) {
        Map<String, Object> result = new HashMap<>();
        System.out.println();
        System.out.println("==============================================");
        headers.forEach((key, value) -> {
            // 日志中输出所有请求头
            System.out.println(String.format("Header '%s' = %s", key, value));
        });

        String receivedData = new String(data);
        System.out.println("Received data: " + receivedData);
        return new ResponseEntity<>("Resource not found", HttpStatus.BAD_GATEWAY);
    }

然后在chunjun StarRocksStreamLoadVisitor.java 文件里修改如上:


private Map<String, Object> doHttpPut(
            String loadUrl, String label, byte[] data, String httpHeadColumns) throws IOException {
        log.info(
                String.format(
                        "Executing stream load to: '%s', size: '%s', thread: %d",
                        loadUrl, data.length, Thread.currentThread().getId()));
       // 增加将正确的stream load url 修改成上方能报502错误的url地址
        loadUrl = loadUrl.replace("xxxx", "127.0.0.1:8080");

测试sql:


CREATE TABLE source
(
    `id`    bigint,
    `username`  string,
    `age`   int
) with (
      'connector' = 'stream-x',
      'number-of-rows' = '2000'
);

CREATE TABLE sink
(
    `id`    bigint,
    `username`  string,
    `age`   int
) with (
      'connector' = 'starrocks-x',
      'url' = 'jdbc:mysql://x.x.x.x:9030/test?useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&&serverTimezone=Asia/Shanghai',
      'fe-nodes' = 'x.x.x.x:8030',
      'schema-name' = 'test',
      'table-name' = 'test_aaa',
      'username' = 'xxx',
      'password' = 'xxx'
      );

insert into sink select * from source;

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct