StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
192 stars 154 forks source link

[Enhancement] Support to configure socket timeout #319

Closed banmoy closed 9 months ago

banmoy commented 9 months ago

What type of PR is this:

Which issues of this PR fixes :

Fixes #

Problem Summary(Required) :

Add a configuration sink.socket.timeout-ms. It's the timeout in milliseconds that the http client waits for data or, put differently, a maximum period inactivity between two consecutive data packets. The default value -1 is same as that of apache http client which is interpreted as undefined (system default if applicable). A timeout value of zero is interpreted as an infinite timeout. You can use this option to fail the stream load from the connector side if the http client does not receive response from StarRocks before timeout. The other option sink.properties.timeout take effects on the StarRocks side, but the response to the client may delay in some unexpected cases. If you want to have a strict timeout from the connector side, you can set this option to an acceptable value.

After reaching the timeout, there will be an exception like this

java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_345]
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_345]
    at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_345]
    at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_345]
    at com.starrocks.streamload.shade.org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
    at com.starrocks.streamload.shade.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[starrocks-stream-load-sdk-1.0-SNAPSHOT-jar-with-dependencies.jar:?]

Checklist: