Open bolliger32 opened 3 years ago
First of all, I don't think we have such a section. We are trying to aim for sensible default parameters or implement auto scaling solutions in case this is not possible. One example, the frequencies the workers hearbeat decrease with increasing number of workers. this is there to protect the scheduler from being flooded by low priority communication overhead.
That said, I wouldn't be surprised to hear that this is not working perfectly. Briefly browsing over the code, I would anticipate the scheduler-info update on client side to be the source of your trouble. This is configured via the option distributed.client.scheduler-info-interval
which is set to 2s by default and fetches a relatively large dict from the scheduler which lists all the workers and some of the workers attributes. I recommend to increase this one to a large value since this is not super critical and mostly used by widgets, afaik.
Can you try this one and see if it helps?
I'm wondering if it could be useful to have a "best practices for creating clusters with many workers" section of the docs which details some tips and tricks for workflows on large clusters
I'm not opposed to something like this but the question is what to put in. if it's just about the scheduler-info interval above, I'd prefer that we implement this to be adapted to cluster size automatically. If there are many of these tricks, some of which cannot do much about, that section would make a lot of sense
cc @jacobtomlinson and @jsignell since they have been putting a lot of thought into structuring of docs lately. Maybe there is a perfect place where you can add your experiences to :)
BTW: 1.5k workers is definitely one of largest setups I am aware of. If there are any other areas where you'd expect dask to scale better or be more stable, I'd welcome your thoughts. Feel free to open another ticket for a discussion if there is anything to share
Thanks so much @fjetter ! I will give that a try next time I run this workflow (probably in a couple days) and report back. I agree it sounds nicer to just implement logical defaults/auto-scaling parameters and avoid having to describe different settings to users. So if there's a way to auto-scale this particular parameter (and if this is indeed the cause of the notebook slowdown) than that seems fine and doesn't require a full documentation section. Maybe just count me as one user who has enough experience to think that a problem like this shoudl be solvable, but not enough to be able to figure out exactly what config parameters to tweak to get there. I'm not sure how many folks there are that fall into tha bucket, but for someone like me, docs could be helpful. That being said, I don't have much knowledge to contribute other than that which comes from periodically running into something that doesn't work smoothly and then asking about it...
Overall, I think this large worker setup runs surprisingly well. The slow and choppy communication is the main issue. The rest of the issues I often run into probably stem more from our kubernetes backend running on preemptible google cloud nodes.
scheduler.allowed_failures
tag.client.restart()
(or by a better workflow that doesn't involve client.scatter()
)Feedback like this is always appreciated and I can only encourage you to open tickets in case you are spending a lot of time working around limitations. We often lack the necessary feedback in these situations since there are not many users running on that scale. You are not the only one and the user base continues to grow on that front but that makes it even more important for us to get the feedback to preemptively fix these issues. Below a few inline comments to your feedback.
So if there's a way to auto-scale this particular parameter (and if this is indeed the cause of the notebook slowdown)
If that parameter turns out to be a problem we could do the same thing as for the worker hearbeat. that's easy, we would like to have confirmation before we do anything like this, though.
Having more workers means we have more nodes, which means higher likelihoods for one of them getting preempted. This can cause some frequent killedWorker errors, so I often bump up the scheduler.allowed_failures tag.
That's likely one of the topics we would cover in the "large deployment" section since this is something hard to auto-configure. that also depends on the kind of workloads and the deployment you're running (are you susceptible to out of memory? are you running on spot instances? etc.)
It took me awhile to figure out that with dask-gateway, you have to do this by sending environment variables to the scheduler, as simply setting your local dask.config before instantiating a dask-gateway scheduler does not do the trick.
This configuration needs to be set before the scheduler is instantiated. I would probably open a ticket over in dask-gateway and ask if this is the "best approach" or if there are other options. I'm not too familiar with dask-gateway myself. If there was a need to make this parameter dynamic, that would also be possible but I don't think this was a common use case.
Other than that, there are just weird network errors that pop up from time to time
Another paramter which is increased often by heavy users is the distributed.comm.timeouts.connect
which is currently set to 30s but a month ago was lower by default. This may help if you see errors like "connect timed out during handshake" or simple "connect timeout" messages.
it can hang indefinitely I think b/c of some workers maybe going offline or coming online during that scattering process (?)
I would expect this to eventually run into a timeout if workers disconnect. For extremely large deployments, I'm not sure if scatter is robust or efficient. By default, scatter will open a connection to every worker and copy the data. For 1.5k workers that's a lot of work for your client. Instead, what it should try doing is to put the data on a few workers and start a tree replication from there. We're working on improving the replication mechanics with Active Memory Management (AMM; see https://github.com/dask/distributed/issues/4982 and https://github.com/dask/distributed/pull/5111 ) Until then, not scattering and letting the workers figure it out might actually be faster. Optimal case is probably if you only scatter to a few workers (10? 100?) and let distributed figure out the rest. even if every job requires the data, the ordinary machinery should replicate this data everywhere eventually.
it can hang indefinitely
We are having issues with actual deadlocks recently due to some low level race conditions int he task state machine. It's hard to tell if your situation is one of those. There have been a few fixes in the latest release. If you are not, yet, on the latest version, I can encourage upgrading. More fixes are planned to help with these things
I feel like my workflows are often "too embarrassingly parallel" for dask
There is no such thing :) We've had some major improvements for the scheduling of such workloads in the recent past, see https://github.com/dask/distributed/pull/4967 (there are a few follow up PRs with a few fixes) which should make these things run much more smoothly.
but because of all of the optimization of communication that the dask scheduler is doing, these can be slow
The optimization in particular is probably the reason why your jobs run OOM on client side and take an insane amount of time being scheduled (and graph sorting, we cannot disable this atm). You might want to disable this entirely. That will likely cause more tasks to appear but maybe overall you're better off.
For DataFrames
we changed our optimization workflow recently such that you should not be bothered by this any longer. I think HLGs haven't reached arrays, yet, and you won't be able to benefit from them in xarray, yet :(
What version of dask(.distributed)
are you running on?
Regarding docs - I think this would belong in a short form "How do I..." or "FAQ" section. We don't necessarily have one of those yet, but the goal would be to put it somewhere where it can be easily indexed by search engines and people will find it.
@fjetter This is incredibly helpful and a reminder of how responsive and engaged the dask community is! Also a reminder for me to be a bit more proactive when I'm running into issues rather than simply settling for a hack and carrying on.
What version of dask(.distributed) are you running on?
I have been trying to keep up with the (very fast) speed of development so am currently running 2021.7.2 but I can't say that all of these issues have cropped up when using this version. It's possible some of these have been solved since the most recent 1 or 2 updates or it's possible I just haven't hit them recently. If/when I hit them again, I'll post specific issues with the version included
If that parameter turns out to be a problem we could do the same thing as for the worker hearbeat. that's easy, we would like to have confirmation before we do anything like this, though.
I'll keep you posted on this thread next time I run this workflow.
That's likely one of the topics we would cover in the "large deployment" section since this is something hard to auto-configure. that also depends on the kind of workloads and the deployment you're running (are you susceptible to out of memory? are you running on spot instances? etc.)
Agreed. Since I am potentially susceptible to running out of memory, my standard approach has been to run with allowed_failures=1
on a subset of jobs first to see if jobs are killing workers for memory reasons. When I confirm that they are not, then I bump this up to something like 10 just to make sure that jobs that are running on workers killed for preemptible node reasons don't trigger KilledWorker errors. Most of my jobs that fall into this use case are client.map
jobs and not native dask collections, and they typically are pretty tightly controlled in terms of memory (e.g. all data loading/processing tasks using the same size arrays), so an example subset usually seems to do the trick, except for when the memory allocation issue creeps in due to excessive fragmentation.
Speaking of this issue, another thing I've noticed with the recent notes on trimming unmanaged memory is that while the "debugging approach to invoking free
" always seems to drastically cut unmanaged memory, setting the distributed.nanny.environ.MALLOC_TRIM_THRESHOLD_
env variable doesn't seem to have nearly the same effect. As the docs are currently written, I interpret them as saying that setting this to 0 should result in basically the same footprint as if I were constantly executing that "debugging" snippet. But I haven't observed that level of reduced memory usage. Not sure if that's expected, and maybe should be clarified in the docs? Or if I'm maybe implementing this incorrectly.
This configuration needs to be set before the scheduler is instantiated. I would probably open a ticket over in dask-gateway and ask if this is the "best approach" or if there are other options
Will do.
Another paramter which is increased often by heavy users is the distributed.comm.timeouts.connect which is currently set to 30s but a month ago was lower by default. This may help if you see errors like "connect timed out during handshake" or simple "connect timeout" messages.
Anecdotally, I have noticed fewer of those issues lately. I'll keep an eye out and if I do see them will try bumping this parameter up
Optimal case is probably if you only scatter to a few workers (10? 100?) and let distributed figure out the rest. even if every job requires the data, the ordinary machinery should replicate this data everywhere eventually.
Makes sense and honestly I'm usually able to figure out a way to not use scatter at all and just load data within every job. Most often the re-loading of this data with each job is worth the stability of not dealing with a future of a scattered object and the potential issues in passing this data across the network
We are having issues with actual deadlocks recently due to some low level race conditions int he task state machine. It's hard to tell if your situation is one of those. There have been a few fixes in the latest release. If you are not, yet, on the latest version, I can encourage upgrading. More fixes are planned to help with these things
I think i've experienced this on the latest release, but perhaps it was 2021.7.1. Will raise an issue again if I encounter this.
There is no such thing :) We've had some major improvements for the scheduling of such workloads in the recent past, see #4967 (there are a few follow up PRs with a few fixes) which should make these things run much more smoothly.
This is great! Will keep improvements like this in mind and continue to test how these sorts of workflows are running
The optimization in particular is probably the reason why your jobs run OOM on client side and take an insane amount of time being scheduled (and graph sorting, we cannot disable this atm). You might want to disable this entirely. That will likely cause more tasks to appear but maybe overall you're better off. For DataFrames we changed our optimization workflow recently such that you should not be bothered by this any longer. I think HLGs haven't reached arrays, yet, and you won't be able to benefit from them in xarray, yet :(
I read this blog post about new strategies for scheduler-side optimization and have experimented with their suggestion of trying setting the optimization.fuse.active parameter to false but haven't noticed too much difference in this experience. Look forward to these updates propagating to xarray!
@jsignell if it would be helpful, I'm happy to start compiling this list of experiences into something more cohesive that could go in something like a "How do I...utilize a very large cluster" section. I don't have the bandwidth this week, but could get started on that soon if you think having a section like that would be useful.
That sounds useful to me if/when you have the time. It would probably also make a good blog post if that appeals to you.
Yeah I could probably scrounge together something like that! Would you be the right person to stay in touch with about how to structure that?
setting the distributed.nanny.environ.MALLOC_TRIMTHRESHOLD env variable doesn't seem to have nearly the same effect.
We're still collecting experience ourselves for this mechanism. In the end, we're relying on the libc implementation you have running so there may be differences depending on what your OS is currently using. We can update the documentation but the question is what to put in there. We're, more or less, simply referencing the official malloc / mallopt documentation
I'm usually able to figure out a way to not use scatter at all and just load data within every job. Most often the re-loading of this data with each job is worth the stability of not dealing with a future of a scattered object and the potential issues in passing this data across the network
My point is rather that you should not necessarily worry about this data replication yourself. If you just create one task which loads this data, dask will implicitly replicate it to the workers which need this data. The network load for this replication is spread evenly on all workers who have the data at a given point in time. If you scatter the data, you are pushing this data from your client to all workers yourself, i.e. your client maintains a connection to all 1.5k workers and pushes the data.
If you let dask figure this out you might have a better time. The default behaviour can sometimes be slow since the worker with the initial piece of data limits requests to distributed.worker.connections.incoming
connections (default 10). I.e. in the first "stage", one worker has the data, the second stage 11, then 121, etc.. (continuously, of course, there are no actual stages just individual transfers with a throttle). If you want to scatter, don't scatter to all workers but rather to a few to kickstart this growth.
Increasing this incoming connection number may be helpful but might also put more memory pressure on your workers since they need to handle more concurrent request and copy more data.
Sure you can ping me, but you can also open a PR anytime at https://github.com/dask/dask-blog
That said, I wouldn't be surprised to hear that this is not working perfectly. Briefly browsing over the code, I would anticipate the scheduler-info update on client side to be the source of your trouble. This is configured via the option distributed.client.scheduler-info-interval which is set to 2s by default and fetches a relatively large dict from the scheduler which lists all the workers and some of the workers attributes. I recommend to increase this one to a large value since this is not super critical and mostly used by widgets, afaik. Can you try this one and see if it helps?
@fjetter Took me awhile to get back to this but this advice was great! Using 30s for this parameter, I've been running a notebook that contains a client connected to a ~2000 worker cluster with basically no slowdowns in executing cells in the notebook.
Still trying to carve out some time for a blog post on these general best practices...
I don't think this is an existing issue or anywhere in the docs, but my apologies if it is already mentioned somewhere. I occasionally run jobs on 1500+ worker clusters (using dask-gateway with the Kubernetes backend) from an interactive jupyterlab interface. When I do this, the notebook response time slows down considerably and I'm assuming this has to do with some extra communication thats occurring between client and scheduler due to the extra worker(?) If that's true, I can see a number of configuration parameters that might improve this situation by slowing down communication frequency, but I'm not exactly positive which ones to alter. I'm wondering if it could be useful to have a "best practices for creating clusters with many workers" section of the docs which details some tips and tricks for workflows on large clusters. It's also possible there is no easy answer to this question, in which case feel free to close this feature request.