datavane / tis

Support agile DataOps Based on Flink, DataX and Flink-CDC, Chunjun with Web-UI
https://tis.pub
Apache License 2.0
989 stars 217 forks source link

mysql->doris 增量执行streamload出错 #285

Closed baisui1981 closed 7 months ago

baisui1981 commented 11 months ago

环境

1、TIS版本:3.8(最新版本) 2、doris版本:2.0.2(最新版本) 3、mysql版本:8.0.34

错误信息

chunjun_connector_20231107_151246_4a1b7abc22154e6fb281265549979547
java.net.UnknownHostException: null: Temporary failure in name resolution
    at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[?:1.8.0_382]
    at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:867) ~[?:1.8.0_382]
    at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1302) ~[?:1.8.0_382]
    at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:815) ~[?:1.8.0_382]
    at java.net.InetAddress.getAllByName0(InetAddress.java:1291) ~[?:1.8.0_382]
    at java.net.InetAddress.getAllByName(InetAddress.java:1144) ~[?:1.8.0_382]
    at java.net.InetAddress.getAllByName(InetAddress.java:1065) ~[?:1.8.0_382]
    at org.apache.http.impl.conn.SystemDefaultDnsResolver.resolve(SystemDefaultDnsResolver.java:45) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:112) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.11.jar:4.5.11]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[httpclient-4.5.11.jar:4.5.11]
    at com.dtstack.chunjun.connector.doris.rest.DorisStreamLoad.loadBatch(DorisStreamLoad.java:216) ~[chunjun-connector-doris-1.12.5.jar:?]
    at com.dtstack.chunjun.connector.doris.rest.DorisStreamLoad.load(DorisStreamLoad.java:192) ~[chunjun-connector-doris-1.12.5.jar:?]
    at com.dtstack.chunjun.connector.doris.DorisUtil.doRetry(DorisUtil.java:72) [chunjun-connector-doris-1.12.5.jar:?]
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.flush(DorisLoadClient.java:314) [chunjun-connector-doris-1.12.5.jar:?]
    at com.dtstack.chunjun.connector.doris.rest.DorisLoadClient.process(DorisLoadClient.java:156) [chunjun-connector-doris-1.12.5.jar:?]
    at com.dtstack.chunjun.connector.doris.sink.DorisHttpOutputFormat.writeSingleRecordInternal(DorisHttpOutputFormat.java:89) [chunjun-connector-doris-1.12.5.jar:?]
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:483) [chunjun-core-1.12.5.jar:?]
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:288) [chunjun-core-1.12.5.jar:?]
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:97) [chunjun-core-1.12.5.jar:?]
    at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:119) [chunjun-core-1.12.5.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:47) [tis-flink-extends-dist-3.8.0.jar:?]
    at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:32) [tis-flink-extends-dist-3.8.0.jar:?]
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:239) [flink-connector-debezium-2.1.0.jar:2.1.0]
    at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:225) [flink-connector-debezium-2.1.0.jar:2.1.0]
    at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:151) [flink-connector-debezium-2.1.0.jar:2.1.0]
    at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:439) [flink-connector-debezium-2.1.0.jar:2.1.0]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
baisui1981 commented 10 months ago

原因分析

doris 版本升级导致be返回的json 内容不一致, 这个是3.8 返回的

{
    "href_columns": ["BackendId"],
    "parent_url": "/rest/v1/system?path=/",
    "column_names": ["BackendId", "Host", "HeartbeatPort", "BePort", "HttpPort", "BrpcPort", "LastStartTime", "LastHeartbeat", "Alive", "SystemDecommissioned", "TabletNum", "DataUsedCapacity", "TrashUsedCapcacity", "AvailCapacity", "TotalCapacity", "UsedPct", "MaxDiskUsedPct", "RemoteUsedCapacity", "Tag", "ErrMsg", "Version", "Status", "HeartbeatFailureCounter", "NodeRole"],
    "rows": [{
        "LastHeartbeat": "2023-11-07 14:48:48",
        "DataUsedCapacity": "1.060 MB",
        "ErrMsg": "",
        "__hrefPaths": ["/rest/v1/system?path=//backends/10005"],
        "BrpcPort": "8060",
        "BePort": "9060",
        "Version": "doris-2.0.2-rc05-ae923f7",
        "HeartbeatPort": "9050",
        "TabletNum": "22",
        "TotalCapacity": "96.150 GB",
        "HeartbeatFailureCounter": "0",
        "HttpPort": "8040",
        "Status": "{\"lastSuccessReportTabletsTime\":\"2023-11-07 14:48:42\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
        "SystemDecommissioned": "false",
        "UsedPct": "48.56 %",
        "Host": "172.16.102.148", // 此处为Host
        "Alive": "true",
        "MaxDiskUsedPct": "48.56 %",
        "TrashUsedCapcacity": "0.000 ",
        "NodeRole": "mix",
        "AvailCapacity": "49.459 GB",
        "RemoteUsedCapacity": "0.000 ",
        "BackendId": "10005",
        "LastStartTime": "2023-11-07 14:35:00",
        "Tag": "{\"location\" : \"default\"}"
    }]
}

doris version 2.0.0-alpha1 仍然返回的是 “IP”

{
    "msg": "success",
    "code": 0,
    "data": {
        "href_columns": ["BackendId"],
        "parent_url": "/rest/v1/system?path=/",
        "column_names": ["BackendId", "Cluster", "IP", "HostName", "HeartbeatPort", "BePort", "HttpPort", "BrpcPort", "LastStartTime", "LastHeartbeat", "Alive", "SystemDecommissioned", "ClusterDecommissioned", "TabletNum", "DataUsedCapacity", "AvailCapacity", "TotalCapacity", "UsedPct", "MaxDiskUsedPct", "RemoteUsedCapacity", "Tag", "ErrMsg", "Version", "Status", "HeartbeatFailureCounter", "NodeRole"],
        "rows": [{
            "LastHeartbeat": "2023-11-14 12:47:36",
            "DataUsedCapacity": "23.272 KB",
            "ErrMsg": "",
            "__hrefPaths": ["/rest/v1/system?path=//backends/10003"],
            "BrpcPort": "8060",
            "BePort": "9060",
            "ClusterDecommissioned": "false",
            "Version": "doris-2.0.0-alpha1-Unknown", // doris version 2.0.0-alpha1
            "HeartbeatPort": "9050",
            "TabletNum": "31",
            "TotalCapacity": "198.903 GB",
            "HeartbeatFailureCounter": "0",
            "HostName": "baisui-test-1",
            "HttpPort": "8040",
            "Status": "{\"lastSuccessReportTabletsTime\":\"2023-11-14 12:46:49\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
            "SystemDecommissioned": "false",
            "IP": "192.168.28.200", // IP
            "UsedPct": "43.36 %",
            "Cluster": "default_cluster",
            "Alive": "true",
            "MaxDiskUsedPct": "43.36 %",
            "NodeRole": "mix",
            "AvailCapacity": "112.657 GB",
            "RemoteUsedCapacity": "0.000 ",
            "BackendId": "10003",
            "LastStartTime": "2023-11-14 12:39:21",
            "Tag": "{\"location\" : \"default\"}"
        }]
    },
    "count": 1
}

这个是3.7.2 返回的

{
    "href_columns": ["BackendId"],
    "parent_url": "/rest/v1/system?path=/",
    "column_names": ["BackendId", "Cluster", "IP", "HostName", "HeartbeatPort", "BePort", "HttpPort", "BrpcPort", "LastStartTime", "LastHeartbeat", "Alive", "SystemDecommissioned", "ClusterDecommissioned", "TabletNum", "DataUsedCapacity", "AvailCapacity", "TotalCapacity", "UsedPct", "MaxDiskUsedPct", "RemoteUsedCapacity", "Tag", "ErrMsg", "Version", "Status", "HeartbeatFailureCounter", "NodeRole"],
    "rows": [{
        "LastHeartbeat": "2023-10-31 13:21:58",
        "DataUsedCapacity": "2.134 GB",
        "ErrMsg": "",
        "__hrefPaths": ["/rest/v1/system?path=//backends/61024"],
        "BrpcPort": "8060",
        "BePort": "9060",
        "ClusterDecommissioned": "false",
        "Version": "doris-1.2.6-rc03-Unknown",
        "HeartbeatPort": "9050",
        "TabletNum": "1113",
        "TotalCapacity": "1.999 TB",
        "HeartbeatFailureCounter": "0",
        "HostName": "172.16.102.133", // 此处为HostName
        "HttpPort": "8040",
        "Status": "{\"lastSuccessReportTabletsTime\":\"2023-10-31 13:21:13\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
        "SystemDecommissioned": "false",
        "IP": "172.16.102.133", // 此处为IP
        "UsedPct": "0.94 %",
        "Cluster": "default_cluster",
        "Alive": "true",
        "MaxDiskUsedPct": "0.94 %",
        "NodeRole": "mix",
        "AvailCapacity": "1.980 TB",
        "RemoteUsedCapacity": "0.000 ",
        "BackendId": "61024",
        "LastStartTime": "2023-08-02 17:46:54",
        "Tag": "{\"location\" : \"default\"}"
    }, {
        "LastHeartbeat": "2023-10-31 13:21:58",
        "DataUsedCapacity": "2.097 GB",
        "ErrMsg": "",
        "__hrefPaths": ["/rest/v1/system?path=//backends/61040"],
        "BrpcPort": "8060",
        "BePort": "9060",
        "ClusterDecommissioned": "false",
        "Version": "doris-1.2.6-rc03-Unknown",
        "HeartbeatPort": "9050",
        "TabletNum": "1107",
        "TotalCapacity": "1.999 TB",
        "HeartbeatFailureCounter": "0",
        "HostName": "172.16.102.134",
        "HttpPort": "8040",
        "Status": "{\"lastSuccessReportTabletsTime\":\"2023-10-31 13:21:45\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
        "SystemDecommissioned": "false",
        "IP": "172.16.102.134",
        "UsedPct": "0.94 %",
        "Cluster": "default_cluster",
        "Alive": "true",
        "MaxDiskUsedPct": "0.94 %",
        "NodeRole": "mix",
        "AvailCapacity": "1.980 TB",
        "RemoteUsedCapacity": "0.000 ",
        "BackendId": "61040",
        "LastStartTime": "2023-08-02 17:48:05",
        "Tag": "{\"location\" : \"default\"}"
    }, {
        "LastHeartbeat": "2023-10-31 13:21:58",
        "DataUsedCapacity": "2.171 GB",
        "ErrMsg": "",
        "__hrefPaths": ["/rest/v1/system?path=//backends/61046"],
        "BrpcPort": "8060",
        "BePort": "9060",
        "ClusterDecommissioned": "false",
        "Version": "doris-1.2.6-rc03-Unknown",
        "HeartbeatPort": "9050",
        "TabletNum": "1149",
        "TotalCapacity": "1.999 TB",
        "HeartbeatFailureCounter": "0",
        "HostName": "172.16.102.135",
        "HttpPort": "8040",
        "Status": "{\"lastSuccessReportTabletsTime\":\"2023-10-31 13:21:58\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
        "SystemDecommissioned": "false",
        "IP": "172.16.102.135",
        "UsedPct": "0.94 %",
        "Cluster": "default_cluster",
        "Alive": "true",
        "MaxDiskUsedPct": "0.94 %",
        "NodeRole": "mix",
        "AvailCapacity": "1.980 TB",
        "RemoteUsedCapacity": "0.000 ",
        "BackendId": "61046",
        "LastStartTime": "2023-08-02 17:48:15",
        "Tag": "{\"location\" : \"default\"}"
    }]
}

解决办法

需要使用最新的 去be接口: https://doris.apache.org/docs/admin-manual/http-actions/fe/backends-action 改造接口: https://github.com/qlangtech/chunjun/blob/f429860efaf160b326c9931b1446536ca340c90e/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/FeRestService.java