ocean-transport / coiled_collaboration

Repository to track and link issues across other repositories, which are relevant to the Abernathey Lab and Coiled
0 stars 0 forks source link

Newer dask/distributed versions take very long to start computation. #2

Open jbusecke opened 3 years ago

jbusecke commented 3 years ago

Hi everyone,

here is a relatively recent issue that puzzles me (and prevents me from upgrading to the latest dask/distributed versions).

For large computations, it can take very long until a any computation is "started", as judged from nothing happening in the task stream/ProgressBar(for threaded scheduler).

This example (which mimics part of my typical workload with high-resolution ocean model output) for example, has been showing nothing for several minutes now (It has been about 8-10 minutes at the point of writing this).

# Lets first create a simple dummy dataset
import dask.array as dsa
import xarray as xr
nt = 3600

so = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])
thetao = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])
a = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])
b = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])

ds = xr.Dataset({'so':so, 'thetao':thetao, 'a':a, 'b':b})

# layer a simple xarray.apply_ufunc
def simple_func(salt, temp):
    return salt+temp

ds['simple'] = xr.apply_ufunc(simple_func, ds.so, ds.thetao, dask="parallelized") # This also blows out the memory

ds
image

Then I set up an adaptive dask gateway cluster

from dask_gateway import GatewayCluster

cluster = GatewayCluster()
cluster.adapt(minimum=6, maximum=20) 
client = cluster.get_client()

I am then trying to write this to the pangeo scratch bucket

# Set up scratch bucket path
import os
PANGEO_SCRATCH = os.environ['PANGEO_SCRATCH']
# -> gs://pangeo-scratch/<username>
import fsspec
mapper = fsspec.get_mapper(f'{PANGEO_SCRATCH}/test/test.zarr')
# mapper can now be to read / write zarr stores

ds.to_zarr(mapper)

I am running this on the pangeo google deployment with a "Large" server.

My versions are:dask:2021.01.1 distributed:2021.01.1

I realize that these datasets are quite large, but they are by no means unrealistic for modern climate/earth system models.

NOTE: I just canceled my first run of this example after ~30 min.

I originally noticed this behavior in one of my research projects when I upgraded from 2020.12.0 version to the latest release (I believe 2021.02.x), and it led me to manually downgrade to get my workflow running since nothing would happen even after I waited for 30+ minutes.

This particular workflow is on a university HPC cluster, and hard to reproduce here, but I tested it with both versions there and was able to improve the behavior drastically with a downgrade to 2020.12.0. Unfortunately it is a bit more tricky to change the dask version on the pangeo deployment and the kubernetes dask workers.

  1. Do you have a hunch as to what changes in the recent version could have exacerbated this issue?
  2. On a more general level, is there a way to monitor 'progress' on what the scheduler does (I assume that it is wrangling with this huge graph?)?
jbusecke commented 3 years ago

I just ran the example again and the cluster started showing activity fairly quickly, so I am not entirely sure the above example is the best to expose this behavior, but perhaps you still have an idea what could be causing this?

jrbourbeau commented 3 years ago

Thanks for the nice example snippet @jbusecke!

Do you have a hunch as to what changes in the recent version could have exacerbated this issue?

Nothing immediately pops out to me. But there has been a lot of recent work on HighLevelGraphs, transmitting graphs to the scheduler, etc. which might have impacted your workflow.

I tried the example locally on my laptop with two changes:

using dask and distributed 2021.02.0 to see if I could attempt to reproduce this long time computation start time. However, locally I saw the task stream become active ~30 seconds after I kicked off the compute with ds.to_zarr(mapper). This makes me think that the slow down isn't due purely to the scheduler processing a large graph.

A couple of things immediately come to mind:

To test whether or not transmitting the graph to the scheduler is a large issue, could you try turning off low-level task fusion? Instead of

ds.to_zarr(mapper)

do

with dask.config.set({"optimization.fuse.active": False}):
    ds.to_zarr(mapper)

with should hopefully result in a much smaller graph getting sent over the wire to the scheduler.

Additionally, I see there have been recent releases of fsspec and gcsfs. Just for completeness, what versions of these packages are you using?

jrbourbeau commented 3 years ago

Stepping back a bit, I suspect that there will be times in the future when you encounter issues when running on pangeo resources and it will be useful for others to try and reproduce them. Does Pangeo have any publicly accessible resources we could use to try and reproduce the issues you run into? I know there's Pangeo cloud and Pangeo's binderhub, but I don't have a good sense for if these are appropriate for this use case

jrbourbeau commented 3 years ago

Also cc @ian-r-rose

jbusecke commented 3 years ago

Stepping back a bit, I suspect that there will be times in the future when you encounter issues when running on pangeo resources and it will be useful for others to try and reproduce them. Does Pangeo have any publicly accessible resources we could use to try and reproduce the issues you run into? I know there's Pangeo cloud and Pangeo's binderhub, but I don't have a good sense for if these are appropriate for this use case

I did run these on the Pangeo Cloud. It only requires a sign up. This would be a good place for all of us to be able to have the same playing field?

jbusecke commented 3 years ago

Thank you very much for the suggestions. Will try them now.

jrbourbeau commented 3 years ago

I did run these on the Pangeo Cloud. It only requires a sign up.

Great! I'll sign up now. Time to dust of my old ORCID...

jbusecke commented 3 years ago

Using

with dask.config.set({"optimization.fuse.active": False}):
    ds.to_zarr(mapper)

indeed cut the wait time down from ~4 min to less than 1 min! Ill try to check that in my full blown workflow to see if this has a similar effect.

rabernat commented 3 years ago

I'm curious about the tradeoffs of bypassing optimizations. They might make the computation start faster...but will it run slower?

jbusecke commented 3 years ago

I didnt run them to completion, but will now 😁

jrbourbeau commented 3 years ago

They might make the computation start faster...but will it run slower?

This is a great question to ask! In general things will be slower. Specifically, here are all the array optimizations that are skipped when "optimization.fuse.active" is turned off. Exactly how much slower things are depends on the particular computation -- though I suspect the last optimization, optimize_slices, is particularly useful for common Xarray workloads.

Either moving these optimizations to be at the HighLevelGraph level (similar to the cull optimization here), or removing the need for a particular optimization altogether with improvements in the distributed scheduler, are part of the ongoing scheduler performance improvement effort (Matt gave a recent talk on this topic and here's a blog post which outlines the main parts of these efforts). Ultimately we want to remove the need for the "optimization.fuse.active" config option, but we're not there yet.

I was mostly interested in turning off "optimization.fuse.active" to get a sense for how much of a bottleneck graph transmission from the client to the scheduler is or isn't.

mrocklin commented 3 years ago

The biggest thing that you'll miss from losing fusion is probably slicing fusion. Dask/Xarray co-evolved a lot of logic to allow slicing on HDF5/NetCDF files to only read in what was necessary. If you're doing full-volume or at least full-chunk data processing then I don't think that you're likely to miss much.

On Tue, Mar 2, 2021 at 5:25 PM James Bourbeau @.***> wrote:

They might make the computation start faster...but will it run slower?

This is a great question to ask! In general things will be slower. Specifically, here are all the array optimizations that are skipped https://github.com/dask/dask/blob/8663c6b7813fbdcaaa85d4fdde04ff42b1bb6ed0/dask/array/optimization.py#L53-L76 when "optimization.fuse.active" is turned off. Exactly how much slower things are depends on the particular computation -- though I suspect the last optimization, optimize_slices, is particularly useful for common Xarray workloads.

Either moving these optimizations to be at the HighLevelGraph level (similar to the cull optimization here https://github.com/dask/dask/blob/8663c6b7813fbdcaaa85d4fdde04ff42b1bb6ed0/dask/highlevelgraph.py#L801), or removing the need for a particular optimization altogether with improvements in the distributed scheduler, are part of the ongoing scheduler performance improvement effort (Matt gave a recent talk on this topic https://www.youtube.com/watch?v=vZ3R1DxTwbA&t and here's a blog post https://blog.dask.org/2020/07/21/faster-scheduling which outlines the main parts of these efforts). Ultimately we want to remove the need for the "optimization.fuse.active" config option, but we're not there yet.

I was mostly interested in turning off "optimization.fuse.active" to get a sense for how much of a bottleneck graph transmission from the client to the scheduler is or isn't.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ocean-transport/coiled_collaboration/issues/2#issuecomment-789296799, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTFO3LG6CM53ADDP4RDTBVXVHANCNFSM4YPUFQ2Q .

mrocklin commented 3 years ago

I'm curious, does this problem still persist when turning off fusion? If there is something else going on here then I'd like to get to the bottom of it. If not then I would encourage this group to start operating without fusion (I think that you'll be ok) and we can work towards making that the default on our end.

jbusecke commented 3 years ago

I havent had time to get back to those test cases yet. In other workflows I have not really noticed this anymore, but Ill try to confirm soonish (backed up by paper revisions this/next week).