datavane / tis

Support agile DataOps Based on Flink, DataX and Flink-CDC, Chunjun with Web-UI
https://tis.pub
Apache License 2.0
989 stars 217 forks source link

添加flink连接同步timeout #298

Closed zyczw closed 7 months ago

zyczw commented 8 months ago

mysql -> doris 由于目标端doris在夜晚load 比较高,导致doris connnector 执行streamload 超时导致丢失数据。 期望在doris sink 端配置超时时间,或者,添加执行streamload 失败重试机制

zyczw commented 8 months ago

image 通过配置这个timeout参数,设置flink写入sink端的超时时间

baisui1981 commented 7 months ago

ChunjunDorisSinkFactory 中添加以下三个参数,用户可以自行定义连接超时异常处理参数

    @FormField(ordinal = 13, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require})
    public Integer connectTimeout;// = loadConf.getRequestConnectTimeoutMs() == null ? DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : loadConf.getRequestConnectTimeoutMs();
    @FormField(ordinal = 14, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require})
    public Integer socketTimeout;// = loadConf.getRequestReadTimeoutMs() == null ? DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : loadConf.getRequestReadTimeoutMs();
    @FormField(ordinal = 15, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require})
    public Integer retries;// = loadConf.getRequestRetries() == null ? DORIS_REQUEST_RETRIES_DEFAULT : loadConf.getRequestRetries();