ray-project / rayfed

A multiple parties joint, distributed execution engine based on Ray, to help build your own federated learning frameworks in minutes.
https://rayfed.readthedocs.io
Apache License 2.0
87 stars 20 forks source link

[PoC] Cross-silo error broadcasting #175

Closed NKcqx closed 10 months ago

NKcqx commented 11 months ago

Background

Before this PR, when the execution of a DAG encounters an error in 'alice', below is what will happen: image

In alice, both main thread and data sending thread will raise the error, and the process will exit. In bob, since it needs the input from 'alice', it will waits for 'alice' forever no matter whether 'alice' exists or not.

Therefore, we need a mechanism to inform the other participant when the DAG execution raises error.

What's in this PR

The below graph shows what will happen now after this PR: image

In alice, when the data-sending thread finds a RayTaskError indicating a execution failure, it will wrap it to a RemoteError object and replace the original data object in place to send to bob. In bob, the main thread will poll data from receiver actor, where it finds out the data is in the type of RemoteError and re-raises it, and gets an exception just as what happens in "alice".

The threading model in this PR is shown below: image

The explanation of the _atomic_shutdown_flag

When the failure happens, both main thread and data thread get the error and trigger the shutdown, which will execute the "failure handler" twice. The typical method to ensure the failure_handler is executed only once is to set up a flag to check whether it has been executed or not, and wrap it with threading.lock because it's a critical section.

However, this will cause the dead lock as shown in below's graph. The data thread triggers the shutdown stage by sending SIGINT signal that is implemented by causing KeyboardInterrupt error (step 8). In order to handle the exception, OS will hold the context of the current process, including the acquired threading.lock in step 6, and change the context to error handler, i.e. the signal handler in step 9. Since the lock has not yet released, acquiring the same lock will cause the dead lock (step 10). image

The solution is to check the lock before sending the signal. That lock is _atomic_shutdown_flag.

fengsp commented 10 months ago

Good docs, maybe it is better we maintain a RayFed Enhancement Proposals directory and put this in it? So that these docs are better managed.

jovany-wang commented 10 months ago

Solid work! I'm going to review soon.

NKcqx commented 10 months ago

In this context, "out-of-band" refers to apart control messages, including the error message, from data messages.

NKcqx commented 10 months ago

Good docs, maybe it is better we maintain a RayFed Enhancement Proposals directory and put this in it? So that these docs are better managed.

Has filed a Rayfed enhancement proposal, pls take a look: https://github.com/ray-project/rayfed/pull/179

cc @jovany-wang

jovany-wang commented 10 months ago

Thanks. It got approved!