Closed robertnishihara closed 7 years ago
Another use case is to load a dataset in a balanced way.
It is also similar to Environment variables, maybe we can unify these two functionalities in some way or provide a better one that replaces both.
We also need to think a little about the api, i.e. which arguments the function should have (maybe total # of nodes or workers and current worker index). And how does it interplay with the scheduler? How do we make sure people don't abuse it to circumvent the scheduler?
I'd say add it with a disclaimer that it is experimental, encourage people to use it rarely and see what it gets used for.
As we discussed earlier, one elegant approach is to provide a single method for running a function on all workers, and pass a counter into that function indicating how many other workers on that machine have already started executing that function (using Redis to do this atomically).
So if there are 4 worker on one machine, and we call something like ray.run_function_on_all_workers(f)
, then one worker will call f(1)
, one worker will call f(2)
, one will call f(3)
, and one will call f(4)
.
This is a very interesting question/discussion. To address this specific use case, I think what you are looking for is the anti-affinity placement constraint, ability to run one task per locality domain (e.g., per node, per worker, per rack). We could provide this functionality in the global scheduler, by extending the scheduler API to accept a bag of tasks (collectively defined as a job) and a placement constraint associated with that job. Then, when the placement decision is made for this bag of tasks, it will be done in a way that honors the constraint, or the job is atomically rejected, if the constraint cannot be satisfied. Of course, it will be ideal to do it in a general way as well as make the placement constraint specification optional (perhaps even as a separate loadable module).
Another take on this would be to approach the problem from the OS systems perspective. We could think about a basic primitive Ray could expose (for instance a scoped atomic counter primitive backed by Redis) that enables ensembles of distributed Ray tasks to do leader election, for example. Counters could be node- or worker-scoped and persist for the lifetime of the task ensemble. It is easy to see how node-scoped atomic counters would enable "at most once per node" functionality, while worker-scoped atomic counters would enable "at most once per worker" functionality. So the Ray function that relies on some "once per worker" or "once per node" pre-processing will add a simple if statement checking the worker-scoped or node-scoped atomic counter and calls the init() function if the atomic counter is zero. The init() could either run in the same task context or as a separate task. The latter requires a mechanism that guarantees init() to run in the same locale as the caller, thus some minimal placement/locality awareness is still needed here. BUT, we could make it relative (as opposed to absolute) to preserve the resource abstraction. Locality constraint could be supported in the form "same locale as me" (affinity) or "not same locale as me" (anti-affinity).
Attempting to achieve everything we need by using what the system already provides is the way to go. As tempting as it is, I would discourage side-channel (i.e. internal/invisible) data/task propagation/distribution/broadcast. Thinking about and exposing expressive/composable basic system primitives will make Ray feel more and more like a microkernel!
Closing for now.
Just wanted to second this request. In my context, I want to set some defaults for libraries I use (Python logging, NumPy print options, etc) on all workers. I'm working around it by just stuffing this code in init.py which will get executed on all the workers, but this is pretty nasty.
@robertnishihara hi, at the released version: 1.13.0 , worker does't have the : ray.worker.get_global_worker() function . and how can i running a function on all workers now ?
Should we expose the ability to run a given function on all workers? This can be useful for a bunch of reasons (for example, properly setting the Python path on each worker).
Should we also expose the ability to run a given function once on each node? This can be useful for setting things up (e.g., installing some stuff, downloading some files, copying application files).
Of course, we shouldn't add any functionality without a very good reason...