flink-extended / flink-remote-shuffle

Remote Shuffle Service for Flink
Apache License 2.0
191 stars 56 forks source link

The shuffle manager should restore the previously managed workers when re-election #8

Open jlon opened 2 years ago

jlon commented 2 years ago

The shuffle manager should restore the previously managed workers when re-electing the master. Otherwise, in the next heartbeat cycle, the job will not be available when the worker is requested, causing the job to fail. We should minimize the impact of shuffle manager failures on operations.

wsry commented 2 years ago

@jlon Do you mean that ShuffleManager should persist its state like all ShuffleWorkers and recover the state when it restarts? Does that mean we need depend on a reliable external storage?

jlon commented 2 years ago

@wsry I want to contribute this feature,can I?

wsry commented 2 years ago

@jlon I am not sure if I understand your concern correctly, but I have some concerns about persisting and recovering ShuffleManager state because it may introduce extra complexity (relying on external storage?). I wonder if the ShuffleManager standby solution is better? The standby solution can also enhance standalone deployment which means we do not always rely on the external system (YARN, K8s) to start up the new ShuffleManager instance.

wsry commented 2 years ago

@jlon BTW, I have sent a friend request on DingTalk, we can also discuss it offline.

gaoyunhaii commented 2 years ago

In the next heartbeat cycle, the job will not be available when the worker is requested, causing the job to fail.

I have one more point to complement is that I think we might have to relies on retrying to solve this issue, unless we could ensures we always have an online shuffle manager at any time, which might not be able to be guaranteed even if we have persist storage ?

jlon commented 2 years ago

@gaoyunhaii In k8s mode, when ShuffleManager is relaunched, we can query the list of pods (workers) under a fixed label through the Api Server of k8s. At the same time, we can also know the ip of each worker pod, so we can actively add to the list Each worker actively asks for a heartbeat. In this way, the previously managed workers can be restored in time, but I haven't thought of how to query the previous container in the yarn environment.