dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Scaling distributed dask to run 27,648 dask workers on the Summit supercomputer #3691

Open markcoletti opened 4 years ago

markcoletti commented 4 years ago

This github distributed dask issue is to track progress and resolve problems associated with scaling dask to run on all the nodes on Oak Ridge National Laboratory (ORNL)’s Summit supercomputer. Benjamin Zaitlen of Nvidia suggested that I create this GitHub issue on the RAPIDS-GoAi Slack when I first shared what we were trying to do there; he communicated with the dask development team, and they posed this approach of creating this issue in the project’s GitHub issue tracker.

At ORNL we are using distributed dask to evaluate unique driving scenarios using the CARLA driving simulator to improve a deep-learner model for driving a virtual autonomous vehicle. As part of this effort, we are stress testing dask on Summit by gradually adding more Summit nodes to submitted jobs, fixing problems at the point where a given number of nodes has caused issues, and then increasing the number of nodes to address the next set of problems.

As of April 9th, the maximum number of Summit nodes from which we’ve gotten a successful run is 402 nodes with six dask workers each, for a total of 2,412 dask workers. We have a job pending with 921 nodes that will push the total number of dask workers to 5,526 dask workers. Eventually we’d like dask running on all 4,608 Summit nodes, which would have 27,648 dask workers.

In parallel, we’re developing an open-source evolutionary algorithm framework, LEAP, with colleagues at George Mason University and MITRE that uses dask to concurrently evaluate individuals. We have used LEAP as the vehicle for stress testing dask on Summit. The first test problem was a MAX ONES instance where the fitness was the total number of ones in a given bit string. Since this problem was too trivial for stress testing, we created a new problem that had a single integer as the individual's phenome (that is, genes that have been decoded into something meaningful to the problem, in this case an integer) that would correspond to how many seconds to sleep during evaluation. This means an individual that had a genome of 10 bits of all ones would cause the dask worker to sleep for about 17 minutes during evaluation. Moreover, this was a maximization problem, so the longer the sleep periods, the fitter the individual. (We could probably call this test the UNDERGRAD problem. 😉 )

In the comments below, I’ll detail what I’ve done so far, and relate some of the problems I’ve encountered on this journey. I welcome comments, tips, suggestions, criticisms, and help with the next inevitable set of problems associated with this non-trivial task. 🙂

Cheers,

Mark Coletti, ORNL

markcoletti commented 4 years ago

I'll be posting a follow-up shortly that shares what we've done, what we're doing now, and possible strategies for resolving future problems in scaling up.

mrocklin commented 4 years ago

I'm very excited to track this issue. Thank you @piprrr for recording your efforts (and of course, for the efforts themselves).

markcoletti commented 4 years ago

You should not use dask_jobqueue on Summit

Of course, it made sense to read up on what others may have done with regards to getting dask to work on Summit, and @mrocklin had described a journey to get a small number of dask workers going on Summit that would connect to a Jupyter notebook. His approach centered around using dask_jobqueue to submit jobs corresponding to dask workers. Unfortunately we had to discard this approach because the OLCF policy for Summit was to have a maximum of five running jobs, which meant there’d be an explicit upper-bound of five dask workers. (And that presumes that the scheduler and the actual main computational task shares one of those batch jobs, otherwise the limit would be for three dask workers. That is, one batch job would be for the scheduler, three each for a dask worker, and the remaining batch job for the dask client.)

As an aside, @mrocklin ‘s article did mention turning off file locking in the dask configuration file top compensate for the fact that Summit mounts home directories as read-only during runs. (Which would obviously cause subpar performance since those would fail.)

In previous work, we had used the python package schwimmbad to implement a kind of map/reduce approach for evaluating individuals in an evolutionary algorithm (EA). To support that, we had a single LSF job that had separate jsrun commands — one for the scheduler and another for the EA, itself. The underlying communication for schwimmbad between the workers and the EA was done via MPI calls.

Inspired by this, we similarly created a single LSF batch script that had a jsrun for the scheduler, another to spawn the workers, and finally one jsrun for the client, itself. Here is an example of such a script cribbed from one that I actually used on summit.

The first jsrun is this:

jsrun --smpiargs="none" --gpu_per_rs 0 --nrs 1 --tasks_per_rs 1 --cpu_per_rs 1 --rs_per_host 1 dask-scheduler --interface ib0 --no-dashboard --no-show --scheduler-file $SCHEDULER_FILE &

Which just runs the scheduler on a single arbitrary core and advises the scheduler to use the available high-speed infiniband connection. We also don’t need the bokeh dashboard. (Though @mrocklin shows how to do that in the aforementioned article if you’re interested in spinning one up.). The --smpiargs argument is to specify that we are not using MPI since dask uses cloud pickle over the wire to slosh things between the scheduler, workers, and client. (Which is a relief because MPI can get a little hairy to deal with on Summit.). Note the closing & to run this task in the background.

The second jsrun fans out the dask workers.

jsrun --smpiargs="none" --nrs $NUM_WORKERS -e individual --stdio_stdout ${RUN_DIR}/worker_out.%h.%j.%t.%p --stdio_stderr ${RUN_DIR}/worker_error.%h.%j.%t.%p --tasks_per_rs 1 --cpu_per_rs 1 --gpu_per_rs 1 --rs_per_host 6 --latency_priority gpu-cpu dask-worker --nthreads 1 --nprocs 1 --interface ib0 --no-dashboard --reconnect --scheduler-file $SCHEDULER_FILE &

This fans out as many workers as Summit “resource sets”, which will be six resource sets per node in the example — essentially giving a single GPU per worker. Obviously you’ll want to tailor the allocation to a configuration that makes sense for your problem. The -e individual and --stdio_stdout and stdio_stderr will capture the stdout and stderr for each worker in a separate file. The default is just to throw everything into a single file, which makes poring over the output challenging. It’s far better to find a corresponding stdout/stderr for a given worker and directly look at the specific file to find what you need when analyzing run results.

Resource sets don’t come up instantly

Workers don’t come up instantly, and can actually take several minutes to come up. In fact, I’ve had runs take 10 to 20 minutes to spin-up all the dask workers for a run.

This means you should delay a bit before starting the client. You can take a best guess at how long to sleep, or you can take out the guess work by using a simple script to wait until a certain number of scripts. With that in mind, I wrote wait_for_workers.py to wait for the scheduler to register a certain target number of workers to then gracefully exit to presumably then execute the jsrun for the dask client script.

One niggling detail. Don’t have that script wait for all the workers to register with the scheduler because if any fail for any reason at all, then you will never get the total number registered you were looking for, and then that script will hang for the entire run duration. Instead, shoot for a much lower number, say, half the total workers. That’s enough for dask to start working on your tasks, and you’ll be sure to always start your client.

So, this is how we wait for workers:

jsrun --smpiargs="off" --nrs 1 --cpu_per_rs 1 --gpu_per_rs 0 --rs_per_host 1 --latency_priority cpu-cpu python3 -u /ccs/proj/csc363/may/carla_imitation_learning/gremlin/wait_for_workers.py --verbose --target-workers $target_num_workers --pause-time 5 --maximum-wait-time 5 --scheduler-timeout 60 $SCHEDULER_FILE

Note there is no trailing & since we want to block until the target number of workers are registered with the scheduler. Also, this uses the same resource allocation as the scheduler — one CPU and no GPUs.

The last jsrun is for the actual dask client and similarly doesn’t have a trailing & because otherwise the batch job would finish right away without giving a chance for the client to do its thing.

The final jskill is to ensure that we kill off the scheduler and workers when the client is finished.

In subsequent comments for this issue I’ll share some tuning tips and gotchas I’ve learned along the way.

markcoletti commented 4 years ago

I'll need to update the referred to github LSR script and make some minor edits, but I've got a meeting, so that will have to wait. :(

jrbourbeau commented 4 years ago

Thanks @piprrr for opening up this issue and documenting your process on Summit!

This means you should delay a bit before starting the client

Just as a side note, there's a Client.wait_for_workers method which may be useful when workers take a while to arrive

mrocklin commented 4 years ago

You should not use dask_jobqueue on Summit

Are you saying that you shouldn't launch tens of thousands of jobs? Hrm, who knew? :)

I'm curious about why MPI would not have been a good choice here.

I'm also curious if there is some modification of dask-jobqueue that would make sense for very large jobs, but my guess is that when we get to the scale of Summit, every machine will be custom enough so that custom scripts provided by the HPC admin staff may be best. Would you agree with this? Or is there something that the Dask development community can do to make this job easier?

markcoletti commented 4 years ago

Thanks @piprrr for opening up this issue and documenting your process on Summit!

This means you should delay a bit before starting the client

Just as a side note, there's a Client.wait_for_workers method which may be useful when workers take a while to arrive

Well, now I feel foolish as that's exactly the kind of functionality I was looking for! Thank you! (So, this issue is already paying a dividend!)

The general guidance should still stand that you can wait for a subset of the total target workers to begin work. I've been recently bit by waiting for all the workers when some failed, which means waiting indefinitely. And, the larger the number of nodes allocated, the likelier some of those will fail.

markcoletti commented 4 years ago

You should not use dask_jobqueue on Summit

Are you saying that you shouldn't launch tens of thousands of jobs? Hrm, who knew? :)

Well, yeah, the folks in the OLCF would not be your biggest fan. ;)

That said, I've been thinking of what I call a "slow cooker" approach that would use dask_jobqueue in that you can have a long running evolutionary algorithm that just submits jobs for each evaluation. When the evaluation is done, the dask worker spins down to free up the limited cluster resource. Then when the EA wants to evaluate another individual, it will submit another job for that task. That way you can share the cluster with other users while still getting results from the EA.

I've shared this idea with the folks running the local cluster here at ORNL, and they are eager to give that a go.

I'm curious about why MPI would not have been a good choice here.

I was bit on Titan in that it was difficult to get MPI to work well with schwimmbad. And then it happened again on Summit. Let's just say that I had to work with the OLCF support staff to find the correct magic environment variables to set to appease MPI. This is what we get when IBM decides to just write its own MPI implementation from scratch instead of porting over one of the well-established MPI libraries to the PowerPC architecture.

Mind, my last tussle with MPI on Summit was over a year ago when the machine still had its new car smell. It should (hopefully) be a lot easier to deal with MPI on Summit now.

I'm also curious if there is some modification of dask-jobqueue that would make sense for very large jobs, but my guess is that when we get to the scale of Summit, every machine will be custom enough so that custom scripts provided by the HPC admin staff may be best. Would you agree with this? Or is there something that the Dask development community can do to make this job easier?

Using the single batch script works for up to 402 nodes, but it will take some tuning of configuration variables to get things to work at larger number of nodes. For example, I'm having to play with bumping up timeouts and heartbeats to keep things together during runs. (And I fear I have more tweaking to do down that road.). I also have the scheduler running on just a single CPU, and am wondering if it would take advantage of one or two more CPUs. I'm thinking not since the scheduler is single-threaded (I think). What do you think?

There might be other things the dask community can do, which hopefully we can explore here.

I'll be sharing more later, but I do have to revert to my other duties. I can't believe how much I've already written! Ha!

mrocklin commented 4 years ago

For example, I'm having to play with bumping up timeouts and heartbeats to keep things together during runs

To be clear, these are Dask settings? Ideally we would make this easier, either by publishing a more robust configuration that users could drop in as necessary, or maybe even by learning automatically what situation we're in and changing things around. For example, Dask heartbeats today will increase their delay automatically based on how many other workers there are (more workers, longer heartbeats). There are probably other things we can do to reduce the number of knobs that you had to turn.

Mind, my last tussle with MPI on Summit was over a year ago when the machine still had its new car smell. It should (hopefully) be a lot easier to deal with MPI on Summit now.

The nice thing about MPI is that then we share these issues with every other project. Probably OLCF folks have decent docs about how to run MPI well.

I also have the scheduler running on just a single CPU, and am wondering if it would take advantage of one or two more CPUs. I'm thinking not since the scheduler is single-threaded (I think). What do you think?

Yes, today the scheduler will mostly use just one CPU (you might throw on one more, just for compression/serialization etc, which can be offloaded to another thread). We could look at multi-core scheduling. That would be a fun project I think, but probably also a larger engagement than anyone is going to do in nights-and-weekends time. The other option here would be to start a major profiling effort around the scheduler, and see if there are hotspots. I wouldn't be surprised if there is some decent speedup still left to be found there.

markcoletti commented 4 years ago

Thanks @piprrr for opening up this issue and documenting your process on Summit!

This means you should delay a bit before starting the client

Just as a side note, there's a Client.wait_for_workers method which may be useful when workers take a while to arrive

I just checked here and didn't see that listed in the summary of member functions for the Client API at the top of that page. (Which is how I missed it the first go! :( ). Oddly enough, it is described in detail later on the same page.

jrbourbeau commented 4 years ago

Thanks for the heads up, I've added to the table at the top of the page over in https://github.com/dask/distributed/pull/3692

markcoletti commented 4 years ago

For example, I'm having to play with bumping up timeouts and heartbeats to keep things together during runs

To be clear, these are Dask settings? Ideally we would make this easier, either by publishing a more robust configuration that users could drop in as necessary, or maybe even by learning automatically what situation we're in and changing things around. For example, Dask heartbeats today will increase their delay automatically based on how many other workers there are (more workers, longer heartbeats). There are probably other things we can do to reduce the number of knobs that you had to turn.

I now worry that by manually setting things I've therefore disabled dynamic behavior that would have helped. Here is the distributed.yaml.gz I used.

It may be beneficial to reset to the default configuration, run with that, and then get feedback from you folks and suggested settings.

Do you guys have a Slack dedicated to dask distributed? It may be easier to do this interactively.

(Though it's nearly 5pm, so I'll be switching over to other tasking.)

mrocklin commented 4 years ago

And look at that, this issue is paying dividends for the Dask project already too :)

On Thu, Apr 9, 2020 at 1:46 PM James Bourbeau notifications@github.com wrote:

Thanks for the heads up, I've added to the table at the top of the page over in #3692 https://github.com/dask/distributed/pull/3692

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3691#issuecomment-611744352, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTHXDBZRU2RGXSSO63TRLYXZVANCNFSM4ME452QQ .

mrocklin commented 4 years ago

There is https://gitter.im/dask/dask , but you'll almost certainly get better response times on github (few of the devs track the real-time chat channel). If you want to go high bandwidth, I'm happy to engage with a video chat.

mrocklin commented 4 years ago

Also, in case you want to read a screed on Slack: http://matthewrocklin.com/blog/2019/02/28/slack-github :)

rcthomas commented 4 years ago

Hi @piprrr thank you for posting this it's been very interesting. A short couple comments:

(1) We at NERSC think ofdask-jobqueue as a solution for people who don't want to write batch scripts. We think some contributions there might make it more useful, but basically that's it. If you want all your workers at once this is just the wrong thing, and it makes no sense to me to sit there and wait for the resources to build up. Ideally you can start feeding work to it and new jobs just add workers. Some people can use that, but sometimes you just want everything at once and to get going. We don't think a blanket statement like "you shouldn't use it" is what we would say. But I've also seen the reflexive "I want to run Dask on a Cray" answer "oh you need dask-jobqueue" --- that is just not right either. :)

(2) Lately we have been really thinking that for "I need all my workers now, and I need them at scale" at NERSC the answer is dask-mpi and containers (Shifter). We're trying to package this up and reconcile that against our queues. This seems to be good for the 1K-worker class of Dask job. I've tried 8K sized clusters and I'm not able to keep the entire cluster alive but amazingly all the work got done.

Would love to get together and compare notes and learn from your large scale experiences.

mrocklin commented 4 years ago

I've tried 8K sized clusters and I'm not able to keep the entire cluster alive but amazingly all the work got done

I would be curious to hear more about this.

On another note, what is the right way to build some sense of community among "HPC admins running Dask". There are a few groups doing this, what would help you all to share information?

On Thu, Apr 9, 2020 at 2:44 PM R. C. Thomas notifications@github.com wrote:

Hi @piprrr https://github.com/piprrr thank you for posting this it's been very interesting. A short couple comments:

(1) We at NERSC think of dask-jobqueue as a solution for people who don't want to write batch scripts. We think some contributions there might make it more useful, but basically that's it. If you want all your workers at once this is just the wrong thing, and it makes no sense to me to sit there and wait for the resources to build up. Ideally you can start feeding work to it and new jobs just add workers. Some people can use that, but sometimes you just want everything at once and to get going. We don't think a blanket statement like "you shouldn't use it" is what we would say. But I've also seen the reflexive "I want to run Dask on a Cray" answer "oh you need dask-jobqueue" --- that is just not right either. :)

(2) Lately we have been really thinking that for "I need all my workers now, and I need them at scale" at NERSC the answer is dask-mpi and containers (Shifter). We're trying to package this up and reconcile that against our queues. This seems to be good for the 1K-worker class of Dask job. I've tried 8K sized clusters and I'm not able to keep the entire cluster alive but amazingly all the work got done.

Would love to get together and compare notes and learn from your large scale experiences.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3691#issuecomment-611767436, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTAIR5N4B4REV5DTKDLRLY6STANCNFSM4ME452QQ .

rcthomas commented 4 years ago

@mrocklin The 8K run was a scaling experiment in the earlier days of Cori where I pulled out all stops, used containers, upped all timeouts I could find. During the run, at no given instant were all the workers alive, I don't think it ever achieved 8K, but all tasks completed. I did not expect this really to work, and it's against all the guidance I had to that point about whether what I was doing made sense. <4K workers seemed more reliable for the simple test problem I was doing.

This was a couple years ago now, it was a Friday afternoon experiment, and Dask was more youthful. If we were to repeat that kind of experiment today I don't know what we would find. Most experiments I've tried, or demo'd, or showed you (thanks) were at the hundreds-to-thousand worker level and this is mostly just fine these days and seems appropriate to the problems we're looking at.

On community. What has worked for Jupyter in HPC I think, is (1) we had a workshop that that helped us learn who was doing what at what facility and (2) out of that we were able to put together a few interest networks of people who periodically get together to chase issues, discuss developments, share lessons learned, best practices, and plan future events. What has not worked well is mailing lists (they just go silent); a Slack channel was set up but also doesn't see much traffic. Vendor engagement is a plus.

The Dask developer workshop held a couple months ago went some distance and I was able to make connections and build understanding of what the community is doing but HPC representation was smallish and there is (for now, but my guess is it won't last forever) a gap between enterprise and HPC Dask. Seeing what's going on in enterprise is very exciting and educational but not always immediately applicable.

jakirkham commented 4 years ago

@rabernat @jhamman , I’m curious if either of you explored this upper bound of workers and how many you were able to keep running at a time 🙂

markcoletti commented 4 years ago

I'm chuffed that this issue has spawned a lively conversation, and hopefully one that can be continued, if not here, then perhaps at a more appropriate venue. I'm definitely curious about distributed dask conferences or workshops; hopefully I can participate in a future one, post-pandemic.

A future possibility is UCAR's Software Engineering Assembly's annual Improving Scientific Software Conference and Tutorials. It's canceled this year because of COVID, of course, but it would be an excellent opportunity next year to talk about distributed dask to the scientific community, and to offer distributed dask related tutorials.

https://sea.ucar.edu/conference/2020

markcoletti commented 4 years ago

Latest 921 node issues on Summit

And now to share the results of the latest batch job on Summit where I am struggling to get six workers per node on a total of 921 nodes to do some work.

The first diagnostic is this:

distributed.scheduler - INFO - Receive client connection: Client-8231f578-7d52-11ea-ac20-70e284145d13 distributed.core - INFO - Starting established connection distributed.core - INFO - Event loop was unresponsive in Scheduler for 33.24s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

What's interesting is that happens at the start of the run, and theClient in question is my "wait for workers" script that doesn't submit tasks; it just waits for a certain number of workers to spin up, or for a certain amount of time to go by, whichever comes first, and then gracefully exists so that the actual Client that has real tasks can get started. So I dunno what the scheduler is doing for 33 seconds.

Then there are a lot of these, which is normal and to be expected: distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.13.30:43583', name: tcp://10.41.13.30:43583, memory: 0, processing: 0> distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.13.30:43583 In the middle of all the worker registration messages, I see this: distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.16.205:37069', name: tcp://10.41.16.205:37069, memory: 0, processing: 0> task: <Task pending coro=<Client._update_scheduler_info() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/client.py:1098> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x200002ee6df8>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/tornado/ioloop.py:690]> Task was destroyed but it is pending! task: <Task pending coro=<Client._update_scheduler_info() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/client.py:1098> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x200002f22c48>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/tornado/ioloop.py:690]> Task was destroyed but it is pending! task: <Task pending coro=<BaseTCPConnector.connect() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/tcp.py:349> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x20001003a1f8>()]> cb=[_release_waiter(<Future pendi...002f22e88>()]>)() at /autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py:362]> Task was destroyed but it is pending! task: <Task pending coro=<BaseTCPConnector.connect() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/tcp.py:349> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x20001003a9d8>()]> cb=[_release_waiter(<Future pendi...002ee6df8>()]>)() at /autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py:362]> Task was destroyed but it is pending! task: <Task pending coro=<BaseTCPConnector.connect() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/tcp.py:349> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x20001003aaf8>()]> cb=[_release_waiter(<Future pendi...002f22c48>()]>)() at /autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py:362]> distributed.core - INFO - Event loop was unresponsive in Scheduler for 13.06s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

Then I see signs that the wait for worker process has ended successfully, and the actual Client that's to do real work, which is an evolutionary algorithm, is spinning up: distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.19.232:38707', name: tcp://10.41.19.232:38707, memory: 0, processing: 0> INFO:__main__:workers: 176 init pop size: 5526 max births: 27630, bag size: 5526 INFO:root:Using a remote distributed model DEBUG:asyncio:Using selector: EpollSelector DEBUG:asyncio:Using selector: EpollSelector distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.19.232:38707 distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.13.94:42167', name: tcp://10.41.13.94:42167, memory: 0, processing: 0> distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.13.94:42167

Mind, workers are still getting registered, which is OK. Since dask is essentially a producing/consumer queuing system, we can already give existing workers something to do while the rest register and warm up.

distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.19.232:38707', name: tcp://10.41.19.232:38707, memory: 0, processing: 0> INFO:__main__:workers: 176 init pop size: 5526 max births: 27630, bag size: 5526 INFO:root:Using a remote distributed model DEBUG:asyncio:Using selector: EpollSelector DEBUG:asyncio:Using selector: EpollSelector distributed.scheduler - INFO - Starting worker compute stream, tcp://10.41.19.232:38707 distributed.core - INFO - Starting established connection

And this is the EA flaming out, and which suggests that the Client threw an exception (hence the client being undefined): distributed.scheduler - INFO - Register worker <Worker 'tcp://10.41.10.121:34115', name: tcp://10.41.10.121:34115, memory: 0, processing: 0> CRITICAL:__main__:Timed out trying to connect to 'tcp://10.41.5.66:8786' after 30 s: Timed out trying to connect to 'tcp://10.41.5.66:8786' after 30 s: connect() didn't finish in time Traceback (most recent call last): File "/ccs/proj/csc363//may/carla_imitation_learning/gremlin/summit/sleeper.py", line 132, in <module> client.close() NameError: name 'client' is not defined

All told 3,935 workers were registered, according a grep of the output, when we wanted 5,526. However, what likely happened is that the client failed, and then killed the whole job while the balance of workers were still coming up. I feel that the problem was that the client couldn't connect to the scheduler within 30 seconds and failed.

To resolve this issue, I'm going to bump the scheduler timeout to something ridiculous, like 10 minutes, which I hope resolves the immediate problem of the EA failing. However, I am concerned about the other warnings that cropped up during the run. Are those things I should be concerned about? And, if so, does anyone have good guidance on how to resolve them?

mrocklin commented 4 years ago

Thank you for the update.

Bumping up timeouts generally would be good.

I don't actually have strong intuition on what would be causing the slowdown here. In principle I would love to see profiling information if we can get it, although profiling in this context might be hard.

You could try running the scheduler with the builtin cProfile profiler. You could also try using the performance_report functionality built into Dask. I'm not sure that either one will be very accurate here. It may also be interesting to profile this as one would profile a compiled C/C++ program. That would avoid any issues that might arise from Python itself, which is probably under some strain.

rabernat commented 4 years ago

Following with interest.

We have done runs with ~1000 worker on Cheyenne with no major issues. For that, we used dask-mpi, which allowed us to do it all in a single big MPI job.

My impression is that a central challenge you will have in scaling up to the level you describe is giving the workers something to do. In our experience, the dask scheduler starts to really choke when you have > 1_000_000 tasks in your graph. This central chokepoint provides a hard limit on the absolute size of computations you can do with dask, regardless of the number of workers in your cluster.

mrocklin commented 4 years ago

the dask scheduler starts to really choke when you have > 1_000_000 tasks in your graph.

In what way did it choke? If my memory serves in your case it wasn't able to respond at interactive time scales (things would sit still for minutes). This is certainly an issue and something that we would love help in fixing. I think that the thing to do here is to start doing some more intense profiling. If anyone has experience profiling networking applications like this, that would be of great help.

However, this limit may also not be as bad if we're talking about non-interactive jobs, which may be the case for the folks on Summit. (It's still important there as well, but we probably have another order of magnitude to play with).

markcoletti commented 4 years ago

Thank you for the update.

No problem! And sorry for being silent for a week as I was wrapped up with other tasking.

Bumping up timeouts generally would be good.

Duly noted. And I'll share if the 10 minutes works. (And, which may be overkill. However, we could probably use these results to optimize the dask "autotune" feature to dynamically adjust timeouts based on the number of workers.)

[...] In principle I would love to see profiling information if we can get it, although profiling in this context might be hard.

You could try running the scheduler with the builtin cProfile profiler. You could also try using the performance_report functionality built into Dask. [...]

I've used that for a different project, and I'll do the same for this one. I'll be happy to share the results once I get them.

Thanks again for chiming in!

rabernat commented 4 years ago

Yes good points Matt, and apologies for the non-technical term "choke". What I really mean is the following...

Each computation (containing N tasks) effectively has two parts, which occur in serial:

  1. the time it takes for the scheduler to prepare to execute the graph, and
  2. the time it takes the workers to actually execute it.

For small jobs 1, is very small, almost negligible. However, I have the expression that it scales very poorly, like O(N^2). I have not measured this, but someone should. Because of this very different scaling between parts 1 and 2, it places, for us, a hard limit on the number of tasks we can run at once. For example, 10_000_000 tasks takes about 30 minutes to finish step 1, and I have never run 100_000_000.

mrocklin commented 4 years ago

Thanks Ryan. You've brought this up several times before. I think that in each time I've suggested that it would be great to reduce the per-task overhead (which I put at about 200-500us). The next step here is probably profiling, both at a Python level, and then probably at a lower C/C++ level.

@rabernat you're one of the people that could help to resolve this problem with your Pangeo resources. Maybe this is something that you could direct some of your funding or collaborators towards?

jakirkham commented 4 years ago

We have done runs with ~1000 worker on Cheyenne with no major issues. For that, we used dask-mpi, which allowed us to do it all in a single big MPI job.

This is an interesting observation. Thanks for sharing Ryan! May be worth exploring if you have time @piprrr 😉

It's also worth noting that using job arrays in dask-jobqueue probably would address this same need ( https://github.com/dask/dask-jobqueue/issues/196 ).

markcoletti commented 4 years ago

I'm trying to avoid adding the MPI dependency. I've had bad prior experiences on Titan and Summit with MPI support. Moreover, we are having to rely on singularity containers to run some of the code, and getting MPI to work from within the container is going to be a little tricky. So, if we can have code that doesn't rely on MPI, as is the case with basic distributed dask, then all the better.

However, if I continue to struggle scaling up to more nodes on Summit, then I will definitely give MPI-aware distributed dask a look. I'll let you know if I do!

rabernat commented 4 years ago

A clarification about dask-mpi: the only thing MPI is used for is launching the dask workers, not for communication or anything else. So maybe it's not so bad.

markcoletti commented 4 years ago

A clarification about dask-mpi: the only thing MPI is used for is launching the dask workers, not for communication or anything else. So maybe it's not so bad.

Thank you for the clarification as I thought that MPI had a more significant role. Good to know.

jakirkham commented 4 years ago

That's right. We are trying to use UCX for communication purposes instead.

mrocklin commented 4 years ago

Making an MPI Comm is also a reasonable thing to do. It just hasn't been done yet.

On Tue, Apr 21, 2020 at 2:38 PM jakirkham notifications@github.com wrote:

That's right. We are trying to use UCX for communication purposes instead.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3691#issuecomment-617427990, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTDVPODC4EPDPGICOQLRNYG5DANCNFSM4ME452QQ .

markcoletti commented 4 years ago

A quick update: I was successfully able to run an experiment on Summit for 921 nodes with six dask workers each for a total of 2,412 dask workers. In total there were 27,631 evaluations performed.

I've just submitted two new Summit jobs, one for half the machine, and the other for the full machine. (I.e., for 2,764 and 4,608 nodes) As we are now well over our quota, I don't expect those jobs to run until next week. (That they're also asking for a significant number of nodes isn't going to help, either. ;) )

markcoletti commented 4 years ago

The 2,764 node job failed, and I may need help reading the tea leaves on this one.

First, the job ran for the fully allotted time, but was hung and killed by the system:

User defined signal 2
Could not read jskill result from pmix server
Could not read jskill result from pmix server
Could not read jskill result from pmix server

I look in the output and see that it had only gotten so far in allocating workers:

Waiting for workers
Deadline set for 5 minutes
Target workers: 8292
Scheduler timeout: 600 secs
Pause time: 5 secs
Scheduler file: /gpfs/alpine/csc363/scratch/mcoletti/sleeper/2764_node/scheduler_file.json
seconds number of workers
0.00    7
130.17  50
135.18  136
140.20  209
145.21  259
150.24  352
155.28  536
160.32  657
165.72  862
172.03  1418
180.27  1912

Yes, I could have used Client.wait_for_workers(), but my wait_for_workers.py script gives a little more detail on worker allocation. In any case, the maximum total number of workers was 16584, but I had set the wait for workers to allow the EA to start after only half of those were allocated, or 8292, or five minutes, whichever came first. However, it is in that process that execution hung, so it was during a call to client.scheduler_info() that things got wedged.

I had the job configured so that each individual worker saved its respective stdout and stderrr to its own file, so I went to the stderr for some, and this is what I saw:

Traceback (most recent call last):
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 265, in _run_finalizers
    finalizer()
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory

and:

FileNotFoundError: [Errno 2] No such file or directory
Exception ignored in: <Finalize object, dead>
Traceback (most recent call last):
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
distributed.nanny - INFO - Closing Nanny at 'tcp://10.41.0.101:38369'
distributed.dask_worker - INFO - End worker

I have no idea what file it was looking for, but that would appear to be the ultimate source of the problem. Any ideas on how to proceed?

mrocklin commented 4 years ago

I don't know exactly what is going on here, but as a workaround, you could try passing the --no-nanny flag to the dask-worker command.

The context is that dask workers are started up by a Nanny, which is used to check health and restart it if necessary. That Nanny uses multiprocessing pipes and other functionality to ensure that it gets a signal when the worker dies. My guess is that some of those mechanisms in Python rely on a file system, which, presumably isn't a great idea at this scale. The easiest thing to do is to just not use a Nanny process and see if that gets you past things.

On Mon, May 4, 2020 at 7:16 AM Mark Coletti notifications@github.com wrote:

The 2,764 node job failed, and I may need help reading the tea leaves on this one.

First, the job ran for the fully allotted time, but was hung and killed by the system:

User defined signal 2 Could not read jskill result from pmix server Could not read jskill result from pmix server Could not read jskill result from pmix server

I look in the output and see that it had only gotten so far in allocating workers:

Waiting for workers Deadline set for 5 minutes Target workers: 8292 Scheduler timeout: 600 secs Pause time: 5 secs Scheduler file: /gpfs/alpine/csc363/scratch/mcoletti/sleeper/2764_node/scheduler_file.json seconds number of workers 0.00 7 130.17 50 135.18 136 140.20 209 145.21 259 150.24 352 155.28 536 160.32 657 165.72 862 172.03 1418 180.27 1912

Yes, I could have used Client.wait_for_workers(), but my wait_for_workers.py script gives a little more detail on worker allocation. In any case, the maximum total number of workers was 16584, but I had set the wait for workers to allow the EA to start after only half of those were allocated, or 8292, or five minutes, whichever came first. However, it is in that process that execution hung, so it was during a call to client.scheduler_info() that things got wedged.

I had the job configured so that each individual worker saved its respective stdout and stderrr to its own file, so I went to the stderr for some, and this is what I saw:

Traceback (most recent call last): File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 265, in _run_finalizers finalizer() File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in call res = self._callback(*self._args, **self._kwargs) File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup sem_unlink(name) FileNotFoundError: [Errno 2] No such file or directory

and:

FileNotFoundError: [Errno 2] No such file or directory Exception ignored in: <Finalize object, dead> Traceback (most recent call last): File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/util.py", line 189, in call res = self._callback(*self._args, **self._kwargs) File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/multiprocessing/synchronize.py", line 88, in _cleanup sem_unlink(name) FileNotFoundError: [Errno 2] No such file or directory distributed.nanny - INFO - Closing Nanny at 'tcp://10.41.0.101:38369' distributed.dask_worker - INFO - End worker

I have no idea what file it was looking for, but that would appear to be the ultimate source of the problem. Any ideas on how to proceed?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3691#issuecomment-623489531, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGY7K5QUUVQ6FBSNW3RP3EUBANCNFSM4ME452QQ .

markcoletti commented 4 years ago

That seems reasonable, so I've resubmitted the job sans nannies. I'll let you know the outcome as soon as the job runs, which may not be for several days.

(We're wayyyy over quota for that Summit account, and are rightfully being dinged for that for submitted jobs.)

mrocklin commented 4 years ago

Ha!

I also think that at some point it probably makes sense to test not whether we can connect, but if things are performant at all. My guess is that even if we can get tens of thousands of machines hooked up we'll still run into performance issues at a scale that is far smaller.

On Mon, May 4, 2020 at 10:39 AM Mark Coletti notifications@github.com wrote:

That seems reasonable, so I've resubmitted the job sans nannies. I'll let you know the outcome as soon as the job runs, which may not be for several days.

(We're wayyyy over quota for that Summit account, and are rightfully being dinged for that for submitted jobs.)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3691#issuecomment-623604032, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTEVIQCSYINOOLUWSZTRP34WJANCNFSM4ME452QQ .

markcoletti commented 4 years ago

I have plans to do some post mortem analysis of the runs, and one of the things I will be looking for will be "Did all the workers do work?" Given that some of the workers will still be coming up when the main program runs, I expect there to be a little non-uniformity in the load distribution, but hopefully all workers will have done something. I also plan on doing later runs with performance reports toggled on to see if that yields anything useful, but I've not done so now for fear that may trigger problems.

markcoletti commented 4 years ago

2,764 node job failed again, which is getting tiresome.

The wait-for-workers utility died with this error:

Raised CommClosedError('in <closed TCP>: TimeoutError: [Errno 110] Connection timed out') during execution
Wait for workers failed. Aborting.

The only thing it's connecting with is the scheduler, and the scheduler timeout was ten minutes. I ... guess ... I could up that? :-\

And looking through the error logs I saw 3,043 instances of this error:

distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0>
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0>

So, made these changes to ~/.config/dask/distributed.yaml:

@@ -23,7 +23,7 @@ distributed:
     transition-log-length: 100000
     work-stealing: True     # workers should steal tasks from each other
     work-stealing-interval: 100ms  # Callback time for work stealing
-    worker-ttl: 300s        # like '60s'. Time to live for workers.  They must heartbeat faster than this
+    worker-ttl: 900s        # like '60s'. Time to live for workers.  They must heartbeat faster than this
     pickle: True            # Is the scheduler allowed to deserialize arbitrary bytestrings
     preload: []
     preload-argv: []
@@ -94,8 +94,8 @@ distributed:
       threads: -1    # Threads to use. 0 for single-threaded, -1 to infer from cpu count.

     timeouts:
-      connect: 60s          # time before connecting fails
-      tcp: 90s              # time before calling an unresponsive connection dead
+      connect: 600s          # time before connecting fails
+      tcp: 900s              # time before calling an unresponsive connection dead
mrocklin commented 4 years ago

At this point I personally would probably stop trying to push things as high as they can go, and instead try to figure out what is causing friction at lower scales. I understand that that may be challenging though.

On Wed, May 6, 2020 at 7:36 AM Mark Coletti notifications@github.com wrote:

2,764 node job failed again, which is getting tiresome.

The wait-for-workers utility died with this error:

Raised CommClosedError('in : TimeoutError: [Errno 110] Connection timed out') during execution Wait for workers failed. Aborting.

The only thing it's connecting with is the scheduler, and the scheduler timeout was ten minutes. I ... guess ... I could up that? :-\

And looking through the error logs I saw 3,043 instances of this error:

distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0> distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.41.10.240:35107', name: tcp://10.41.10.240:35107, memory: 0, processing: 0>

So, made these changes to ~/.config/dask/distributed.yaml:

@@ -23,7 +23,7 @@ distributed: transition-log-length: 100000 work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing

  • worker-ttl: 300s # like '60s'. Time to live for workers. They must heartbeat faster than this
  • worker-ttl: 900s # like '60s'. Time to live for workers. They must heartbeat faster than this pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings preload: [] preload-argv: [] @@ -94,8 +94,8 @@ distributed: threads: -1 # Threads to use. 0 for single-threaded, -1 to infer from cpu count.

    timeouts:

  • connect: 60s # time before connecting fails
  • tcp: 90s # time before calling an unresponsive connection dead
  • connect: 600s # time before connecting fails
  • tcp: 900s # time before calling an unresponsive connection dead

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3691#issuecomment-624686597, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBGJEOQY5JAQ47KSKTRQFYYTANCNFSM4ME452QQ .

markcoletti commented 4 years ago

Well, it doesn't cost me anything to jigger some things and resubmit the jobs. I can carry on other tasking while the jobs are pending. However, I'd be interested in pointers in trying to dig a little deeper into what some of the bottlenecks might be. I suspect the biggest one is going to be the scheduler, though I've given that 2 CPUs to crank on.

Honestly, for the stuff we're trying to do, I think even grabbing 900 nodes, which I've proved works, is probably adequate for most of our problems. However, our flagship distributed EA, MENNDL, can run on the whole machine, so I'd like to at least demonstrate similar capability for the distributed dask based code.

I also haven't tried the MPI-based distributed dask, which may be a direction I go next, even though it's only responsible for setting up workers.

mrocklin commented 4 years ago

I also haven't tried the MPI-based distributed dask, which may be a direction I go next, even though it's only responsible for setting up workers.

Making an MPI based communication system for Dask is also in-scope if anyone wanted to take it on. That would probably reduce setup woes. I would estimate this at about a week for an experienced Dask dev, and maybe a month for a decent dev who was not experienced with Dask comms.

On Wed, May 6, 2020 at 7:56 AM Mark Coletti notifications@github.com wrote:

Well, it doesn't cost me anything to jigger some things and resubmit the jobs. I can carry on other tasking while the jobs are pending. However, I'd be interested in pointers in trying to dig a little deeper into what some of the bottlenecks might be. I suspect the biggest one is going to be the scheduler, though I've given that 2 CPUs to crank on.

Honestly, for the stuff we're trying to do, I think even grabbing 900 nodes, which I've proved works, is probably adequate for most of our problems. However, our flagship distributed EA, MENNDL, can run on the whole machine, so I'd like to at least demonstrate similar capability for the distributed dask based code.

I also haven't tried the MPI-based distributed dask, which may be a direction I go next, even though it's only responsible for setting up workers.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3691#issuecomment-624699254, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTAJBJ7ID5BDRLKPJ3LRQF3DNANCNFSM4ME452QQ .

jakirkham commented 4 years ago

There's already UCX-Py. Maybe try that before writing a new comm? 😉

cc @quasiben @pentschev

markcoletti commented 4 years ago

Here's an overdue update: I'm still struggling to get the 2,764 node runs to work with dask. Progress is slow because we're well over our allotment of Summit hours; we can still run jobs, it just might take a few days for them to do so.

In any case, the last run had some new errors, which I count as a sort of progress.

First were the usual unresponsive Scheduler and Worker failed to hearbeat errors:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 6.13s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.ributed.scheduler - WARNING - Worker failed to heartbeat within 900 seconds. Closing: <Worker 'tcp://10.41.1.217:43433', name: tcp://10.41.1.217:43433, memory: 0, processing: 0>

Fair enough, but there were some new errors afoot this go round.


distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.77s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
Future exception was never retrieved
future: <Future finished exception=OSError("Timed out trying to connect to 'tcp://10.41.0.221:32929' after 600 s: Timed out trying to connect to 'tcp://10.41.0.221:32929' after 600 s: connect() didn't finish in time")>
Traceback (most recent call last):
  File "/autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/core.py", line 232, in connect
    _raise(error)
  File "/autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/comm/core.py", line 213, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://10.41.0.221:40215' after 600 s: connect() didn't finish in time

I'm guessing a Future finished, but had difficulty propagating back to the Client. If so, this is troubling.

I also observed this error:


ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<Client._update_scheduler_info() running at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/distributed/client.py:1098> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x200003d4de88>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /autofs/nccs-svm1_proj/csc363/may/LEAP/venv/lib/python3.7/site-packages/tornado/ioloop.py:690]>
ERROR:asyncio:Task was destroyed but it is pending!

I'm not entirely sure what's up with that.

Moving forward, I've removed a call to my wait_for_workers.py in the batch submission script, and added a call to Client.wait_for_workers(). My hypothesis is that having two back-to-back client connections may be causing problems, especially if workers are still coming on line. I confess that's a flimsy model for what's going on, but this was something I was going to implement anyway, and I may as well give it a go before proceeding on to the next attempt.

If this fails, I'll begrudgingly try the MPI-based distributed dask implementation to see if that shakes things loose.

(Again, we're avoiding MPI because on Summit we must use IBM's proprietary Spectrum MPI library that's a fork of OpenMPI burdened by DRM garbage that's not exactly a joy to get to work inside a Singularity container. But, if we must suffer that option to get whole machine runs, then suffer we shall.)

jakirkham commented 4 years ago

Are you using array jobs to submit the workers or is each worker its own job?

benjha commented 4 years ago

There's already UCX-Py. Maybe try that before writing a new comm? 😉

cc @quasiben @pentschev

@jakirkham So, as far as I understand UCX-py efforts are for dask-cuda and, while compiling it, we can explicitly set UCX dependencies, but for DASK, which is already available for ppc64le, the question is if DASK latest versions has already been built with UCX.

markcoletti commented 4 years ago

Are you using array jobs to submit the workers or is each worker its own job?

We chose to use a single job approach because there is a tight limit on the number of concurrent jobs on Summit, which is five. Since we're going for full machine runs, that means we need to do everything within a single job since 27,648 jobs (4,608 nodes x six workers per node) running simultaneously is wellll over that five job limit. ;)

And, just to be clear, we're currently shooting to get just half the machine to run, or 2,764 nodes. Once we get distributed dask to work at that level, we'll shoot for the whole enchilada.

For those familiar with Summit's batch queueing system, LSF, we use a single jsrun statement to fan out the dask workers. Here's an example:

jsrun --smpiargs="off" --nrs $NUM_WORKERS -e individual --stdio_stdout ${RUN_DIR}/worker_out.%h.%j.%t.%p --stdio_stderr ${RUN_DIR}/worker_error.%h.%j.%t.%p --tasks_per_rs 1 --cpu_per_rs 1 --gpu_per_rs 1 --rs_per_host 6 --latency_priority gpu-cpu dask-worker --nthreads 1 --nprocs 1 --interface ib0 --no-dashboard --no-nanny --reconnect --scheduler-file $SCHEDULER_FILE &

mrocklin commented 4 years ago

I'll begrudgingly try the MPI-based distributed dask implementation to see if that shakes things loose

I don't think that this will help you. All that dask-mpi does is use MPI to communicate the address of the scheduler, and then it connects as normal. We could make a proper Dask MPI communication system, but that would require a bit of work. I'm guessing a week or two of a full-time dev who had done this before.

cc'ing @fjetter who might have thoughts on stability with large numbers of workers

Regarding UCX I think that it would be interesting to try, mostly because it would help to isolate the probably away from TCP connections.

If I were to try to isolate the problem here I would probably start by running this on a smaller set of workers (merely 1000 or so :) ) and do some profiling. I would hope that that would highlight some bottleneck that a small dev investment could resolve. Then I would increase from there. Going directly for thousands of nodes without profiling seems potentially wasteful to me. This could be any of a hundred different issues. I think that we'll need to figure out what's going on before we try to push harder.

markcoletti commented 4 years ago

I'll begrudgingly try the MPI-based distributed dask implementation to see if that shakes things loose

I don't think that this will help you. All that dask-mpi does is use MPI to communicate the address of the scheduler, and then it connects as normal. [...]

Ok, thanks for the heads up, Matthew.

[...] If I were to try to isolate the problem here I would probably start by running this on a smaller set of workers (merely 1000 or so :) ) and do some profiling. I would hope that that would highlight some bottleneck that a small dev investment could resolve. Then I would increase from there. Going directly for thousands of nodes without profiling seems potentially wasteful to me. This could be any of a hundred different issues. I think that we'll need to figure out what's going on before we try to push harder.

I'm not a distributed dask developer, but I'm willing to share profiling information here. I suspect any identified bottlenecks on Summit are likely to manifest, albeit likelier in lesser form, for others.

I suspect that the bottlenecks are going to be in the scheduler. For example, just monitoring all the worker heartbeats is a burdensome task, I imagine.