apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.77k stars 4.21k forks source link

Improve the recoverability of DoFn #27094

Open BjornPrime opened 1 year ago

BjornPrime commented 1 year ago

What would you like to happen?

During a recent Dataflow streaming job, an ongoing DoFn experienced a period of frequent 429 error responses from an overloaded Elasticsearch sink (using ElasticsearchIO). During this time, it was observed that autoscaling occurred due to increased backlog, but workers did not re-establish their connections once the sink was back online, causing the pipeline to remain frozen.

Steps should be taken to improve the resiliency of DoFn to these sorts of disruptions and ensure that they resolve themselves without the pipeline needing to be restarted manually.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

InigoSJ commented 1 year ago

To add more context, the connections are created in the Setup method, that are ran once per DoFn

https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L2417

If the connection breaks, it won't be restated until a new DoFn instance is created (due to new workers or, as in this case, a Dataflow Runner V2 update)


It's unclear to me how this could be improved, moving the connection creation to startBundle would mean many more connections and lower performance.

The only approach I can think of is to check fi the connection is alive in startBundle, and if it's not, recreate it

egalpin commented 1 year ago

thanks for reporting @InigoSJ / @BjornPrime 😊 The approach to alter when/where connections are created is a good idea so long as, as was mentioned, connections are not created unnecessarily or in an uncontrolled manner.

@InigoSJ Based on what was observed, do you happen if there is a way to tell programmatically that a connection is alive? Maybe within the code to handle bad statuses from ES (429's) would be a good spot to re-create connections?

InigoSJ commented 1 year ago

Just as side note, I have never used this IO besides this Git Issue, so I am definitely not an authority here.

I think the idea I mention above should work, check if the connection is alive in StartBundle() and if it's not recreate. I would do something like this:

ElasticIO {
Connection connection

@SetUp() {
connection = CreateConnection()
}

@StartBundle() {
if (!connection.IsAlive()) {
    connection = CreateConnection
}
}

@ProcessElement {
....
}

CreateConnection {
     Same code as now
}

}

Again, I don't know the intricacies of the IO, so this may not be possible, but any other idea I can come up with seems to have more downsides that the current approach