Closed jbusecke closed 3 years ago
This is a very helpful data point @jbusecke, thank you! I'm sad but excited that turning off spilling helped. I'm probably more wrong than right about this, but it may also have something to do with https://github.com/dask/distributed/issues/4424. Basically, while workers are spilling or un-spilling data from disk, they're unresponsive to any other events—including transferring data to other workers, responding to the scheduler, starting on other tasks, etc. I wrote up a comment on that issue with some hypothesis of how that might affect the system.
Very curious to see what happens on the production workflows!
Update: Now that I've looked more carefully at the notebook, I don't think the dask.config.set({"distributed.worker.memory.spill": None})
is actually being propagated to your workers. dask.config.set
only applies to the local machine:
In [1]: import dask
In [2]: import distributed
In [3]: client = distributed.Client("tcp://192.168.0.39:8786")
In [4]: client.run(lambda: dask.config.get("distributed.worker.memory.spill"))
'Out[4]: {'tcp://192.168.0.39:54908': 0.7}
In [5]: with dask.config.set({"distributed.worker.memory.spill": None}):
...: print(client.run(lambda: dask.config.get("distributed.worker.memory.spill")))
...:
{'tcp://192.168.0.39:54908': 0.7}
In the case of this particular config setting, because it's referenced right when the worker is created, you'd need to either set the DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=None
environment variable in your pangeo Docker image, or pass the memory_spill_fraction
kwarg to the worker when it's created. On Coiled we support that, but I don't know if there's anything equivalent you could do on Pangeo to get this setting in there?
Nonetheless, something there made that last run go much better, but I'm even more mystified what it could be. I'm currently trying to rerun your notebook on Pangeo staging, both with and without the dask.config.set
line, to see if it changes anything.
Thanks for trying all this out @jbusecke. Unfortunately I don't think the code in the notebook actually disables spilling. Setting dask.config.set({"distributed.worker.memory.spill": None})
in the notebook will impact the config value of the distributed.worker.memory.spill
on the client but not on the scheduler or workers, since they are are different machines. Note that since this config value is only used on the workers setting the config value on client side won't have any functional impact.
To test that out you can
def get_config_value(key):
import dask
return dask.config.get(key)
print(get_config_value("distributed.worker.memory.spill"))
inside the notebook to get the current distributed.worker.memory.spill
config value and then run
print(client.run(get_config_value, "distributed.worker.memory.spill"))
print(client.run_on_scheduler(get_config_value, "distributed.worker.memory.spill"))
to do the same for the workers and scheduler (Client.run
and Client.run_on_scheduler
allow you to run a function on remote workers / scheduler and then send the result back).
Whoops, looks like @gjoseph92 beat me to it. FWIW when I ran through the notebook I got very similar performance with and without spilling -- both looked like the performance_report_full @jbusecke posted
Thank you for looking into this, and the additional info. That confuses me even more then. I definitely had some major issues with the full calculation, but it seems that is not a persistent issue? FWIW, these problems seem to be somewhat reflected still in my production workflow (and indeed the workers are still spilling!). The production workflow adds a few masking operations (separating ocean basins, making sure that weights have the same missing values as args before passing to histogram), which usually add a bunch more tasks. The only actionable idea I have right now is to simulate these operations in the benchmark notebook here too.
Or do you have any other ideas what I can do to clarify this problem?
The production workflow adds a few masking operations (separating ocean basins, making sure that weights have the same missing values as args before passing to histogram), which usually add a bunch more tasks.
If you have a region
variable with value 1
in the atlantic, 2
in the pacific, for example, can you not pass this as an extra variable to "group by" in xhistogram with bins [0.5, 1.5, 2.5]
?
PS: I found this repo from https://github.com/xgcm/xhistogram/pull/49, and assume it's OK to participate since it is public.
If you have a region variable with value 1 in the atlantic, 2 in the pacific, for example, can you not pass this as an extra variable to "group by" in xhistogram with bins [0.5, 1.5, 2.5]?
Oh that is actually an excellent idea. Ill try that out!
Appreciate the help @dcherian!
@dcherian: This works really nicely! Saves me a lot of files and fuss.
@jrbourbeau @gjoseph92 : I am still trying to disable spilling. I have followed these instructions to set environment variables. This is what I am doing:
from dask_gateway import Gateway
gateway = Gateway()
# set env variables
options = gateway.cluster_options()
env = {
# "DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": False,
# "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 5,
"DASK_DISTRIBUTED__WORKER__MEMORY__SPILL":None,
}
options.environment = env
cluster = gateway.new_cluster(options)
# scale up the cluster
# cluster.scale(10)
cluster.adapt(minimum=2, maximum=10) # or to a fixed size.
cluster
client = cluster.get_client()
client
I then follow @jrbourbeau instructions to check if that has effect:
# Proper way to check config values (https://github.com/ocean-transport/coiled_collaboration/issues/9#issuecomment-829765791)
def get_config_value(key):
import dask
return dask.config.get(key)
print(client.run(get_config_value, "distributed.worker.memory.spill"))
print(client.run_on_scheduler(get_config_value, "distributed.worker.memory.spill"))
I am getting {'tls://10.38.71.7:42651': '', 'tls://10.38.85.8:42633': ''}
. I assume this means spilling is activated?
When I run my computation I still see spilling (orange tasks). Am I getting this wrong still?
Would some of this be easier if we switched to Coiled dask clusters?
The CMIP6 data are all on AWS now, so the choice of cloud is not a problem.
Would be happy to check that out. What would be needed from me to get an account?
Try using the string "None"
instead of None
when you make env
. Though I'm not certain it'll make a difference in how dask interprets it in the end.
When I run my computation I still see spilling (orange tasks). Am I getting this wrong still?
Yes, if you're seeing orange in the task stream (disk-read-
tasks), then spilling is still happening. However, if you just see bars on the worker memory usage chart turning from blue to orange, that doesn't necessarily mean spilling is happening—just that your workers have gone beyond their memory target.
Would some of this be easier if we switched to Coiled dask clusters?
Potentially. But because Pangeo offers gateway.cluster_options().environment
(I didn't know this), I think we can get pretty far with this particular question on Pangeo. I think what could become more of a reason to switch to coiled is the ease of picking different dask, distributed, and xhistogram versions. If we find ourselves needing to do that, it's probably a bit easier to customize with Coiled.
Last, here's a really awful hack you could try to forcibly turn spilling off once the cluster has already started. If using the string "None"
doesn't work, then you could try this. You'd run this immediately after client = cluster.get_client()
, before any work has been submitted:
import distributed
def patch_spilling_off(dask_worker: distributed.Worker):
assert len(dask_worker.data) == 0, "Can only patch spilling when no data is stored on the worker"
dask_worker.data = dict()
dask_worker.memory_spill_fraction = None
client.run(patch_spilling_off)
I tried running with this on Pangeo, and it would run great (no idle time on the task stream)... until I'd get a WorkerKilled
from a worker allocating too much memory. On all workers, the memory usage just kept going up; some would be successfully restarted by the nanny, but eventually one would just die. The computation never got more than ~1/3 of the way, but here's a performance report for that third: full-real-nospill.html
I just tried "None"
and I am now getting other values from the get_config_value
call:
{'tls://10.36.6.31:35351': None, 'tls://10.36.6.32:33573': None, 'tls://10.36.6.33:36247': None, 'tls://10.36.6.34:44797': None, 'tls://10.38.74.4:38453': None, 'tls://10.38.76.8:40521': None, 'tls://10.38.77.6:37843': None, 'tls://10.38.81.11:34085': None, 'tls://10.38.83.15:38163': None}
None
But the workers are still spilling to disk?!?
I have just pushed the newest version of my testing notebook, with all the steps I added...
I tried running with this on Pangeo, and it would run great (no idle time on the task stream)... until I'd get a WorkerKilled from a worker allocating too much memory. On all workers, the memory usage just kept going up; some would be successfully restarted by the nanny, but eventually one would just die. The computation never got more than ~1/3 of the way, but here's a performance report for that third: full-real-nospill.html
Even with the spilling it seems that the memory usage just keeps growing, but I dont get why. This whole operation should be very well parlellizable. Is there some way to see if things are not getting released? At least some chunks are actually stored.
Maybe this is better handled fresh on Monday haha.
What would be needed from me to get an account?
You can go to https://cloud.coiled.io to sign up and https://docs.coiled.io for our documentation. I'm also happy to hop on a call at some point if you'd like a guided tour.
if you're seeing orange in the task stream (disk-read- tasks), then spilling is still happening But the workers are still spilling to disk?!?
Today Dask workers can't distinguish between writing a task result to disk from it taking some non-negligible amount of time to put the result into memory (see here for where we determine when an orange rectangle should appear in the task stream). So, while usually orange in the task stream corresponds with spilling to disk, this isn't always the case.
I'm getting the sense that it might be worth us jumping on a call. I suspect a few minutes of us looking through things together will help save a lot of back-and-forth. @jbusecke are there some times next week that work well for you?
I'm getting the sense that it might be worth us jumping on a call. I suspect a few minutes of us looking through things together will help save a lot of back-and-forth. @jbusecke are there some times next week that work well for you?
That would be awesome. Much appreciate the offer. I could do as early as Monday 11AM EST. Would that work for you by any chance?
You can go to https://cloud.coiled.io to sign up and https://docs.coiled.io for our documentation. I'm also happy to hop on a call at some point if you'd like a guided tour.
I just singed up and will take a look around, but might take you up on that offer some time in the future.
Today Dask workers can't distinguish between writing a task result to disk from it taking some non-negligible amount of time to put the result into memory (see here for where we determine when an orange rectangle should appear in the task stream). So, while usually orange in the task stream corresponds with spilling to disk, this isn't always the case.
Got it, so I might have misinterpreted this.
Either way I will put in another hour or so playing around. Ill post results, but just so I can find them for the meeting!
I could do as early as Monday 11AM EST. Would that work for you by any chance?
That sounds great -- just sent you / Gabe a calendar invite. Happy to invite others too, just let me know if you'd like to be included
I just singed up and will take a look around, but might take you up on that offer some time in the future.
đź‘Ť
Cool. Excited to see you on Monday. Enjoy the weekend!
@jrbourbeau, @jbusecke, and I met this morning—very productive to be able to go back and forth in real time! We came away suspicious of a graph structure issue, since we observed that even after thousands of array-creation tasks had run, no store tasks had run (and none were ready to run). We think there is some sort artificial bottleneck which is forcing many (all?) array creations to complete before any stores run.
Visualizing the high-level graph, we noticed an odd fan-out-fan-in structure (you wouldn't normally see this in a high-level graph), with many parallel bincount
layers, which all fan into a single concatenate
. It's that concatenate
we're suspicious of. We think that might be coming from this code in xhistogram: https://github.com/xgcm/xhistogram/blob/bb1fa237696f382fedb4a9eceed3d9df941aea64/xhistogram/core.py#L44-L51. I'm going to investigate the graphs a bit more, but I have a feeling this could be rewritten to blockwise
.
Thank you both for the super informative meeting @gjoseph92 @jrbourbeau.
I have commited a slightly cleaned up notebook, which can be used as a rough 'diff' to see what we saw.
Hey @jbusecke, some weird news: I just can't reproduce the issue. I'm running this notebook (your version, added a missing dask
import, switched to only computing the full-size arrays we were looking at this morning) on https://staging.us-central1-b.gcp.pangeo.io/, where it looks like:
dask == distributed == 2021.3.1
xhistogram == 0.1.2
xarray == 0.17.0
The computations are running smoothly with 20 workers (40 cores, 171.80 G memory), both writing to a real zarr and with our map_blocks(lambda x: np.empty((0,0,0)))
hack. This was with the "full size" array of nx,ny,nz,nt = 360, 291, 45, 1980
. Just for fun, I bumped nt
up to 6000, and that worked fine too.
I also happened to run it locally on my mac (8 cores, 32GiB memory) against xhistogram=0.1.3
, dask
@ https://github.com/dask/dask/pull/7594, distributed
@ https://github.com/dask/distributed/commit/97c66db49e6285f7c75cf1b5eddc9e917b6de2eb (main from ~4 days ago), and it did just fine.
Here are performance reports and screengrabs of the dashboard for each of these runs on Pangeo staging:
2d-histogram-nullstore: dropping the memory on completion with the map_blocks trick
2d-histogram-zarr: actually storing to zarr
2d-histogram-zarr-6000: storing to zarr, nt = 6000
just for fun
The main thing to note is that in all of these, the store
(or lambda
) tasks are running alongside everything else, as we'd hope. I'm not seeing the same behavior we saw this morning, where the stores wouldn't run, even though lots of data should have been available to store.
Oh damn. The only small thing I believe I changed, was the vertical chunk size cz
and I was running on production. Let me test this on staging.
What was cz
before? I'm realizing that the lack of spatial chunks might be making a big difference here.
cz=20
(from the old notebook).
Sorry for that oversight, I just kinda changed it while cleaning up. I will concurrently test on staging and production.
Ok, I'm testing with that on staging now
Ok, this is really weird. It works for me on staging (with and without z chunks) AND on production (only tested with z chunks).
This brings me back to @stb2145 issue in #8. If I am correct there was also some sort of 'non-reproducible' element to that, no?
I gotta pack up over here, but Ill get back to this tomorrow.
My proposed steps:
I am still puzzled by this...the task graph still shows the weird fanning in/out behavior, but store/lambda tasks are advancing as expected.
Oh wait, I just got it to mess up again, It was working nicely, and then just stopped storing....wow, this is just super weird. I just pushed the exact version that failed here.
It stored 50 chunks (task graph looked similar to yours), and then this:
@jbusecke are you sure you had the same number of workers every time? In that dashboard above it looks like there are 5 workers. I'd been trying to wait to run things until all 20 were available.
I think ideally it should still be able to work with 5, just a lot slower. But maybe that could explain the "magic" variation?
Ah good catch. I think you are right and the number was lower. I unfortunately wont have time to get back to this today, but will put in some time tomorrow. Hopefully we can get to the bottom of this.
In that dashboard above it looks like there are 5 workers. I'd been trying to wait to run things until all 20 were available.
I would say that, in general, this is a huge pain point for dask users. How many workers does a given computation need? And how much memory per worker? Are there any guidelines? Mostly we find out through trial and error (and burn a lot of compute in the process).
In traditional HPC we think a lot about strong / weak scaling. Good strong scaling means that, as the number of workers increases, the runtime decreases proportionally. For dask workflows like this, however, it seems that things just grind to a halt if we don't have enough workers.
Is it worth looking systematically at strong / weak scaling for some problems? We did a little bit in this paper, but not with workflows of this complexity.
Some preliminary tests with https://github.com/xgcm/xhistogram/pull/49, seem to indicate that it will solve my issue here. I will close for now, and reopen if necessary. Thanks for all the help @gjoseph92 @jrbourbeau
Following @gjoseph92 suggestion, I am opening a separate issue here, since the joint distribution problem seems to be sufficiently distinct from #8.
I have revised my notebook slightly, now storing the output to a zarr store in the pangeo scratch bucket instead of loading it to memory.
I ran two 'experiments':
The shorter run finishes fine (I see a lot of comms, but it seemed that this is unavoidable due to the nature of the problem)
The longer run looks ok in the task graph, but the memory does fill up substantially and leads to some orange disk-spilling and thing slow down a lot. The computation sometimes finishes but it takes forever. I do not understand why this would happen? The only abstract way of making sense of this is that for a given time slice, the operations that work on one variable have to 'wait' for the other variables to become available?
To troubleshoot I ran with
dask.config.set({"distributed.worker.memory.spill": None})
, and that seems to magically fix the problem! 🙀 performance_report_nospill I have to admit I do not understand why this is happening, but ill take this!I will see if that speeds up my production workflow, and report back!