Open mrocklin opened 6 years ago
Just for completeness, when I mentioned this I was thinking more about it in a dask-jobqueue context, but it would be nice to have a feature like this in distributed for sure!
Just for completeness, a few remarks about the dask-jobqueue context I was thinking of
create_worker
and override the parameters that were passed in the JobQueueCluster
constructor). In this context, adaptivity would be nice to have but not crucial I reckon.I'll just say that I generally like this idea. Currently, in jobqueue, it seems like it might be easier to create multiple Clusters with different configurations but if there were a nice way to orchestrate this sort of functionality within a Cluster, that would be cool.
Another use case is creating separate workers for I/O and compute (see for e.g. http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/).
The ability to distinguish between the two would be useful for ensuring that "the cores are always fed".
Given the description above, would it be possible to create workers with separate I/O and compute threads? In the current distributed paradigm, one might specify these workers like so:
dask-worker scheduler:8786 --resources "io=32; compute=8" --nprocs=1 --nthreads=40
However, manually specifying io/compute resources for each task is somewhat laborious and error prone.
Another use case is creating separate workers to download data from third party servers with a limited number of connections. To make things more interesting, this could also be a max number of connections per ip, and thus per worker node/machine/instance. So when running on e.g. a single 32 core node, one would have 32 cpu-bound workers and for example 8 download workers for a certain server with max. 8 connections/ip. When adding a new node to the cluster, the number of cpu workers would always scale up with the extra number of cpus. However, the number of download workers would always increment with 8.
A couple things that would be nice for my use case:
1) I also have CPU and GPU workers and would like a less strict resource mapping as my tasks can still be run on CPU machines but slower. In Kubernetes they have the concept of affinity where a pod can prefer to be scheduled on a certain node but still be run on other nodes/contexts if that one doesn't exist. It would be nice to have an analogous concept with tasks.
2) I would also like having the ability to create a node or worker pool that is reserved for time sensitive tasks. My cluster needs to process some jobs as fast as possible and I would like it if I could reserve node(s) and possibly monitor them separately from the rest of the workload. Going back to Kubernetes, there is the concept of taints which prevent pods from being placed on a node. Again would be nice to have an analogous concept for the tasks.
Curious how these multiple worker types could or could not support this.
I think this issue has some relation to https://github.com/dask/distributed/issues/2127, where @kkraus14 wants to run certain tasks on CPU/GPU workers. I've also wanted to run tasks on specific workers, or require resources to be exclusive for certain tasks.
Currently, these task dependencies must be specified as additional arguments to compute
/persist
etc. rather than at the point of actual construction -- embedding resource/worker dependencies in the graph is not currently possible.
To support this, how about adding a TaskAnnotation
type? This can be a namedtuple, itself containing nested tuples representing key-value pairs. e.g.
annot = TaskAnnotation(an=(('resource', ('GPU': '1'), ('worker', 'alice')))
dask array graphs tend to have the following structure:
dsk = {
(tsk_name, 0) : (fn, arg1, arg2, ..., argn),
(tsk_name, 1) : (fn, arg1, arg2, ..., argn),
}
How about embedding annotations within value tuples?
dsk = {
(tsk_name, 0) : (fn, arg1, arg2, ..., argn, annotation1),
(tsk_name, 1) : (fn, arg1, arg2, ..., argn, annotation2),
}
If the scheduler discovers an annotation in the tuple, it could remove it from the argument list and attempt to satisfy the requested constraints. In the above example, annotations are placed at the end of the tuple, but the location could be arbitrary and multiple annotations are possible. Alternatively, it might be better to put them at the start.
I realise the above example is somewhat specific to dask arrays (I'm not too familiar with the dataframe and bag collections) so there may be issues I'm not seeing.
One problem I can immediately identify would be modifying existing graph construction functions to support the above annotations (atop/top support is probably the first place to look).
The cluster object is about starting and stopping workers, not about assigning workers to tasks. I don't think that this is related.
@mrocklin Ah sorry. If you think the above is useful, should the discussion be moved to #2127 or a new issue be opened?
EDIT: Created issue in https://github.com/dask/dask/issues/3783
I recommend a new issue.
On Thu, Jul 19, 2018 at 8:29 AM, Simon Perkins notifications@github.com wrote:
@mrocklin https://github.com/mrocklin Ah sorry. If you think the above is useful, should the discussion be moved to #2127 https://github.com/dask/distributed/issues/2127 or a new issue be opened?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2118#issuecomment-406259390, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszMNokB2LZMBwaFcWWgSeIjYWWHLBks5uIHuygaJpZM4VQO52 .
I cannot figure out how does one deal with network/compute-bound tasks now. I use ThreadPoolExecutor
for network tasks and then feed results to ProcessPoolExecutor
workers to compute. If I get this right I need to start two LocalCluster
s to do the same thing in Dask.
Is there a particular workload that you're having trouble with? If not then I recommend using the defaults until difficulties arise.
On Mon, Feb 4, 2019 at 12:55 PM Makarov Andrey notifications@github.com wrote:
I cannot figure out how does one deal with network/compute-bound tasks now. I use ThreadPoolExecutor for network tasks and then feed results to ProcessPoolExecutor workers to compute. If I get this right I need to start two LocalClusters to do the same thing in Dask.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2118#issuecomment-460367555, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszH32WMeU-rssmaClG0np9d8ICC_cks5vKIIygaJpZM4VQO52 .
@mrocklin well, currently I load html pages using 10 parallel threads, then parse and process results with BeautifulSoup using 5 worker processes. Of course number of workers can be tuned up, but in general in the 1st case we mostly wait and in the second one mostly compute.
As I understand I have to use different clusters for this workflow (and therefore separate dashboards, if I want to monitor the whole process somehow)
I recommend using the defaults until difficulties arise
Do you mean just use one cluster, tune up its settings and look at the overall execution time?
Yeah, the default settings on LocalCluster will give you a mix of threads and process that I suspect will be decent for your workload. I would try it out and see how it performs before worrying too much about it.
On Mon, Feb 4, 2019 at 11:28 PM Makarov Andrey notifications@github.com wrote:
@mrocklin https://github.com/mrocklin well, currently load html pages using 10 parallel threads, then parse and process results with BeautifulSoup using 5 worker processes. Of course number of workers can be tuned up, but in general in the 1st case we mostly wait and in the second mostly compute.
As I understand I have to use different clusters for this workflow (and therefore separate dashboards, if I want to monitor the whole process somehow)
I recommend using the defaults until difficulties arise Do you mean just use one cluster, tune up its settings and look at the overall execution time?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2118#issuecomment-460538686, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszG1scqJjyDeQrAARu6yHzrPPiGf3ks5vKTKFgaJpZM4VQO52 .
@mrocklin Worker pools like same as airflow implementation could certainly be useful.
The use case I encountered was
Another scenario: you have a worker size that accommodates 95% of tasks or higher, but the occasional pathological cases will use 2x memory. I'd love to retry tasks that were killed by the nanny, giving them each time X% more memory. This assumes dask-kubernetes and some kind of autoscaling to bring up beefier machines on demand.
Hello 👋 I'm interested in this topic since the project I'm working on has need of different types of resources per worker.
@mrocklin @lesteve @jhamman @jacobtomlinson Is there still interest in this? and if so could someone point me in the direction where improvements/implementations could be made?
The various Cluster objects often allow the user to provide specifications of a worker (cores, memory, software environment, ...) and then provides mechanisms around increasing and decreasing the number of workers.
However sometimes a dask deployment has a few different kinds of workers, for example machines with GPUs or high memory, or machines from a queue that is more or less expensive or reliable in some way.
This suggests that maybe the Cluster object should accept a list of worker pools, and provide common functionality around them.
Things like the widget are easy to scale to multiple pools. Adaptivity is a bit weirder.
cc @lesteve @jhamman (dask-jobqueue) @jcrist (dask-yarn) @jacobtomlinson (dask-kubernetes)
Credit for this thought goes to @lesteve