apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.26k forks source link

[Bug]: ElasticsearchIO should use RetryPolicy in case of connectivity issues #30036

Open bvolpato opened 10 months ago

bvolpato commented 10 months ago

What happened?

ElasticsearchIO has proper handling for 429 Too Many Requests, but high load to Elasticsearch may be expressed through several other symptoms.

For example, I've noticed Connection reset and Broken pipe. Failing the bundle may impact the pipeline negatively, when the intent of the user is just to try the Elasticsearch operation.

It would be great to revisit the default RetryPredicate to also retry in those connectivity issues.

Issue Priority

Priority: 3 (minor)

Issue Components

bvolpato commented 10 months ago

Stacktrace for one of the issues

java.io.IOException: Connection reset by peer
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:935)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:300)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:288)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushBatch(ElasticsearchIO.java:2550)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2483)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.finishBundle(ElasticsearchIO.java:2444)
Caused by: java.io.IOException: Connection reset by peer
    at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
    at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:245)
    at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
    at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358)
    at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231)