Open fjetter opened 1 year ago
Hello, this is very cool to see, and I'm excited to see it being applied to arrays!
In my Xarray-centric experience the shuffle is usually expressed as a out-of-order indexing of a dask.Array (for e.g. index out all elements belonging to a group in a groupby operation; and present that to the user as a new array). See this comment and below for some discussion. Currently this just results in an explosion of tasks, and is only workable for a limited set of problems.
For built-in reductions, we can use a dask.array.reduction
style approach (see flox)but we still need this kind of shuffling for GroupBy.map
much like a dataframe.
Is it feasible for the indexing/slicing code to use the shuffle for such cases.
Is it feasible for the indexing/slicing code to use the shuffle for such cases.
So far, we implemented this only for the API call arr.rechunk and haven't considered more generic slicing. However, the biggest lift was not the rechunk API but rather figuring out how to chop up the array and piece it back together properly but that's basically slicing :)
We are currently generating a plan (this all is subject to change) that calculates the "target chunk" for every slice of an input chunk https://github.com/dask/distributed/blob/f4328bbd919a08fc351a7514bf7895e14a5de7a4/distributed/shuffle/_rechunk.py#L134-L162
effectively this is just a mapping like old_chunk_ix -> list of (new_chunk_ix, slice)
, i.e. if you can express an operating like this we can use this P2P backend.
The only difference between rechunk and generic slicing (IIUC) is that rechunk typically slices contiguously. The extension currently doesn't care (or optimize for) this so I am pretty confident we can express generic indexing as well using this. There are very likely cases where the old style indexing is still better so a challenge will be figuring out which algorithm to choose.
@dcherian is arr.rechunk
also useful for you or will you only be able to try this once we implement indexing/slicing?
how to chop up the array and piece it back together properly but that's basically slicing :)
That's what I was thinking ;)
@dcherian is arr.rechunk also useful for you or will you only be able to try this once we implement indexing/slicing?
Yes it is useful but I'll have to make up a use-case. It has been such a problem for so long, we try to avoid it as much as possible, or use rechunker to create a second copy of the dataset.
The other common place this bites us is arr.reshape
(or stack
with xarray) Example: https://github.com/dask/dask/issues/5544 (EDIT: there are so many issues pinging this one!)
cc @TomNicholas @jbusecke @rabernat
I think that the right approach here is to start to generalize out the shuffle system to the point where a sharp and adventurous developer, like @dcherian , could try implementing his own operation. This will also likely come up with map_overlap
with the imaging people.
My sense is that there is likely some abstraction that could be designed similar to Aggregation
but more complex that could enable folks outside of @hendrikmakait and @fjetter to engage effectively here.
This looks very exciting! I have a test problem I would like to try it out on. What's the easiest way for me to do that? Use Coiled with distributed >2023.3.0
?
@TomNicholas: Yes, using distributed>=2023.3.1
with a Coiled cluster should be the easiest way for you to go about that, though I suggest using distributed>=2023.5.0
. See the instructions on the Pangeo Discourse (https://discourse.pangeo.io/t/rechunking-large-data-at-constant-memory-in-dask-experimental/3266/2) on how to make sure you're using p2p
rechunking. Just note that disabling fusion is not necessary anymore if you're using distributed>=2023.5.0
.
On a related note, I'm currently improving the P2P rechunking implementation (distributed#7897). I still have to create a range of benchmarks to test these changes and a decision heuristic between tasks
and p2p
rechunking (dask/dask#10226), so I might add your example to the list.
Please let me know if you have more common rechunking workloads you care about. Creating an issue on https://github.com/coiled/benchmarks/issues might be the best way to do that.
I'm not sure why P2P rechunking helps with Tom's test problem. Can someone explain that?
ms = MemorySampler()
ds = xr.Dataset(
dict(
anom_u=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
anom_v=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
)
)
quad = ds**2
quad["uv"] = ds.anom_u * ds.anom_v
mean = quad.mean("time")
with ms.sample():
mean.compute()
@dcherian: I don't think it does. I've tested the problem recently, and it appears to lack rechunking of any sort. Hence P2P rechunking won't help. @TomNicholas: Do you have more information on this? Where would you have expected it to help?
Okay hmmm - in that case there must be some other change that's caused me to see a totally different performance between running an old version of distributed locally and the new version with P2P on Coiled. I'll try both on Coiled instead now to eliminate possible causes.
Okay I take it back, 'array.rechunk.method': 'tasks'
and 'array.rechunk.method': 'p2p'
behaved indistinguishably on this problem when I tried both with distributed main
via Coiled.
Okay I take it back, 'array.rechunk.method': 'tasks' and 'array.rechunk.method': 'p2p' behaved indistinguishably on this problem when I tried both with distributed main via Coiled.
If you ran this on Coiled, can you provide us with the cluster ID?
If you ran this on Coiled, can you provide us with the cluster ID?
232904 for without P2P and 232905 for with P2P.
These two runs both look pretty much the same. Were you able to reproduce the totally different performance you mentioned here?
The performance I originally saw when I ran with a local cluster without p2p looked like this.
I'm wondering now whether I actually somehow accidentally switched from measuring total cluster memory usage of all types (including spilled) to just measuring process memory when I moved to Coiled. That would explain the difference in the shape of the graphs.
Now on Coiled I get this plot, where the red is spilling, which is consistent with my first result.
EDIT: Actually I now realise what happened - I had set 'distributed.worker.memory.spill': False
when I originally ran it locally (to force a crash on the large run I expected to fail instead of having out interminably). My bad :facepalm:
Today I tried the P2P chunking to rechunk a 3D array (chunked at 128x128x128) into an optimized, directional (1x1024x1024) variant. The input array is Zarr and the output is also Zarr.
On array-based P2P chunking, I noticed the shuffle barrier is the primary source of the bottleneck (i.e. seems like it makes a copy of the entire array on disk? I saw huge disk write usage while it wasn't writing anything to the target array.) It starts
Am I doing something incorrectly, or are my expectations wrong? I would expect (N, 1024, 1024) sized pieces to be written out whenever they're ready and freed from temporary cache. The memory usage is definitely consistent, but it doesn't flush anything to the new Zarr array until the whole dataset is read. Any suggestions on the best way to diagnose what is going on?
@tasansal What you are describing is indeed what is expected. Before we can write out any of the output chunks we have to read the entire dataset in, otherwise the output chunk may be incomplete before we write it out.
That's probably a case where task-based rechunking is better. In general my guess is that we'll probably prefer task-based rechunking when the number of stages in the plan is minimal, and prefer p2p-based rechunking when there are more stages in the plan. Those cases have full copies regardless.
@fjetter I understand. However, that would reduce the utility of P2P shuffling for many tasks, no? Is it possible to have multiple shuffle barriers based on the chunking? What I mean is:
If I have an array that is shaped (128, 128, 128)
and chunked at (16, 16, 16)
and if I want to re-chunk it to (4, 64, 64)
; the problem can be broken down to make the computation to use the disk for (16, 64, 64)
pieces at a time. Or ideally, some logic like Rechunker Algorithm. You may ask, why not just use rechunker
; I have used it in the past and it has some quirks and because it would be much better if Dask does it for us in a bigger, maintained codebase 😃
@mrocklin That Makes sense. Is there a way to limit/adjust the memory usage of task-based re-chunking? I have tried it with the new worker-saturation
config and inlining the Zarr arrays but it still does use much more memory than expected and spills.
Also, probably out of scope, but caching the full float32 array on disk doesn't work well with compressed formats like Zarr. It can potentially take 2-10x more disk space than users expect. Should this be documented? Or could the cache use compression?
@fjetter I understand. However, that would reduce the utility of P2P shuffling for many tasks, no?
The benefit is indeed situational; that's also why we haven't made it the default (as opposed to P2P shuffles for dataframes). Finding a good heuristic has also proven surprisingly hard and depends on the underlying hardware (https://github.com/dask/dask/issues/10226).
Is it possible to have multiple shuffle barriers based on the chunking?
We've considered the possibility of using multiple barriers, but it is not a high priority. The way P2P is currently built, it would still force you to roundtrip all data through disk even if it fits into memory. So before solving the algorithmic problem, we need to fix this one. Otherwise, you likely wouldn't benefit much from the speedups because rechunking would still be disk-bound. #7618 is a first step into that direction. All that being said, let us know if you would like to get involved in the efforts of improving P2P rechunking. There's much to be done :)
Also, probably out of scope, but caching the full float32 array on disk doesn't work well with compressed formats like Zarr. It can potentially take 2-10x more disk space than users expect. Should this be documented? Or could the cache use compression?
Given the current append-strategy to the files on disk, I doubt that we would gain much benefit from disk. Documenting this might be a good idea, would you be interested in creating a PR for this?
Or ideally, some logic like Rechunker Algorithm.
Is there a way to limit/adjust the memory usage of task-based re-chunking?
@tasansal if you're not already aware you might be interested in cubed, which is a generalisation of rechunker (and ships a version of rechunker within it).
@mrocklin That Makes sense. Is there a way to limit/adjust the memory usage of task-based re-chunking? I have tried it with the new worker-saturation config and inlining the Zarr arrays but it still does use much more memory than expected and spills.
@tasansal can I ask you for an example problem that people can look at? Is something like the following representative of your problem?
import dask.array as da
x = da.random.random((2000, 2000, 2000), chunks=(128, 128, 128))
x.rechunk(("auto", 1000, 1000)).to_zarr(...)
I'm also curious, what is actually stopping you from using this today? Is it slow performance? (If so, I'm curious what you're using today that has better performance). Is it running out of memory? (that seems unlikely given the p2p solution). Is it running out of disk?
What I'm hearing now is "this could be better" and I entirely agree with that. I'm also curious on what specifically is causing you pain, and also what, if anything, makes it insufficient.
I ran this both locally and on cloud and I think I'm hitting hardware performance. Video here in case people are intereted.
Probably my version of the problem isn't representative of what you're running into @tasansal . If you can help by providing a representative example that would be really useful.
Hi all, I'm having the same issue. A small way to reproduce this is
import dask
import dask.array as dsa
dask.config.set({"array.rechunk.method": "p2p", "optimization.fuse.active": False})
shape = (1500, 700, 720*20)
chunks = (1500, 700, 720)
data = dsa.random.random(shape, chunks=chunks)
data_rc = data.rechunk((1500, 700, 300))
data_rc.compute() # or save to disk
The real-world example that this is trying to imitate is opening 20 netcdfs (monthly files of hourly data) and rechunking to fixed length-300 chunks (then saving to zarr). The resulting dask graph using p2p has the same shuffle-block that lease to memory spill and eventual crash.
Context for why I was trying p2p in the first place: the non-p2p dask graph for this task has a different failure where the order in which rechunk-splits and merges are shuffled for reasons I don't understand. Using the toy code above, disabling p2p, I get what I expect, a very neat dask graph with a well-optimized task order.
However in my real-world example, I instead get an unexpected shuffling of task ordering (represented by the diagonals below) that result in tasks being held in memory much longer than would be necessary under the more optimal ordering that the toy example results in.
I can't reproduce this exactly without the data but it's just a bunch of standard netcdfs:
files = glob.glob(args.data_location)
ds = xr.open_mfdataset(files[:20], inline_array=True)
ds = ds.chunk({"latitude": -1, "latitude": -1, "time": 300})
ds.to_zarr('/data/dummy.zarr')
Interesting, I don't suppose there's a way to reproduce this, even if in a non-minimal way?
My sense is that for rechunking that isn't all-to-all (like (1000, 1) to (1, 1000)) the the task based mechanism will be near-optimal. If there is some ordering issue then we should try to hunt that down.
annoyingly, my attempt at a reproducer with public data has led to the graph I expect :/
import s3fs
import xarray as xr
from dask.distributed import Client, LocalCluster
local_dir = '/data/era5-test/'
s3 = s3fs.S3FileSystem(anon=False)
for f in s3.glob('s3://era5-pds/1980/*/data/air_temperature_at_2_metres.nc'):
savename = local_dir + f[9:]
s3.download(f, savename)
cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='28GB')
client = Client(cluster)
ds = xr.open_mfdataset(local_dir+'*/*/data/*.nc', inline_array=True)
ds = ds.chunk({'lat':-1,'lon':-1,'time0':300})
ds.air_temperature_at_2_metres.data.visualize(optimize_graph=True, color='order')
ds.to_zarr(local_dir+'output.zarr')
Ah, managed to reproduce with just adding a few more files. Also I think I'm hijacking @fjetter's thread here, so lmk if I should just open a new issue.
Code is as follows (takes ~5 minutes to download if depending on zone, requires ~20 GiB of disk space). Interesting that this only shows up when I have 24 files instead of the 12 above.
import s3fs
import xarray as xr
from dask.distributed import Client, LocalCluster
local_dir = '/data/era5-test/'
cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='28GB')
client = Client(cluster)
s3 = s3fs.S3FileSystem(anon=False)
for year in [1980,1981]:
for f in s3.glob(f's3://era5-pds/{year}/*/data/air_temperature_at_2_metres.nc'):
savename = local_dir + f[9:]
s3.download(f, savename)
ds = xr.open_mfdataset(local_dir+'*/*/data/*.nc', inline_array=True)
ds = ds.chunk({'lat':-1,'lon':-1,'time0':300})
ds.air_temperature_at_2_metres.data.visualize(optimize_graph=True, color='order')
ds.to_zarr(local_dir+'output.zarr')
So here you can see those funky parallels again The rechunk-merge bottom center that's held in memory is a consequence of the funky task ordering here (represented by those diagonals); do this for 500 files and those aggregate to crash this otherwise straightforward job.
ds.air_temperature_at_2_metres.data.visualize(optimize_graph=True, color='order')
Can you share the result of this call? You'll need to include a filename:
ds.air_temperature_at_2_metres.data.visualize("myfile.png", optimize_graph=True, color='order')
The .visualize
method gives a much more informative/careful visualization than what the Bokeh chart does (the bokeh chart has to update every 200ms, so doesn't try as hard).
Ah, sorry, was running in a notebook so that bit was interactive. Here is the resulting chart:
🤔 someone else should take a look at that, but it's not immediately clear to me that that's a fail case just yet. Looking at the numbers, it seems like maybe the graphviz/dot layout is a bit off?
There are clearly lines swooping around that look off, but when you check the numbers and colors it seems like Graphviz has placed things a oddly anyway (everyone uses heuristics to this problem).
Are you able to make this look more obviously bad by adding more files?
The swooping lines in the visualize() figure aren't clear issues, agree that they're mostly just a quirk of plotting. Some issues that I do see: node 200 (bottom, middle right) should be running before 186, which should run before 179; I think this might be whatever led to the allowed failing of this test. I'm wondering if it's in conflict with the desired ordering specified in this test?
OK, a clean example; era5 sample data downloaded as above (2 years worth).
files = glob.glob('/data/era5-test/*/*/data/*.nc')
ds = xr.open_mfdataset(files, inline_array=True)
ds = ds.chunk({"lat": -1, "lon": -1, "time0": 300})
s3 = s3fs.S3FileSystem(anon=False)
store = s3fs.S3Map(root=str(s3_zarr_store_path), s3=s3, check=False)
encoding = {vname: {"compressor": zarr.Blosc(cname='zstd', clevel=3)} for vname in ds.data_vars}
zarr_data_store_out = ds.to_zarr(store=store, encoding=encoding, consolidated=True, compute=False)
zarr_data_store_out.visualize('/home/ubuntu/gro/vis_graph_full.png', optimize_graph=True, color='order')
zarr_data_store_out.compute()
bokeh plot after a few minutes; notice the held rechunk-splits (and there are a lot more of these when scaling up 20x)
matching .visualize() plot:
I'm having a bit of a hard time aligning these two. The 0 node (representing what I understand to be a task resulting from the first open_dataset) shares dependent nodes with the 3 node and the 15 node; by 'root' node ordering, those should be 2nd and 4th 'root' nodes processed (0, 3, 9, 15). Yet the bokeh plot shows that the first 'root' node (lowest on the left; the left column here processed monotonically bottom to top) shares a dependent node with the 10th 'root' node (left column, 10 from the top). So where am I misunderstanding the relationship between the static plot and the runtime dask graph?
Just checking in here, I've gotten pulled away this week. I think that @fjetter plans to think about this soon though. It's near the top (but not yet at the top) of his priority list.
Motivation
Shuffles are an integral part of many distributed data manipulation algorithms. Common DataFrame operations relying on shuffling include
sort
,merge
,set_index
, or various groupby operations (e.g.groupby().apply()
,groupby(split_out>1)
) whereas the most stereotypical array workload is therechunk
. There are many other applications for an efficient shuffle implementation which justifies taking a dedicated approach to solve this issue.Shuffling is a poor fit for centralized graph-based scheduling, since the graph is all-to-all (naive O(N²), dask O(N logN); in size where N is the number of partitions), yet the core logic of a shuffle is so simple, it benefits little from centralized coordination, while suffering significant overhead from it. With task-based shuffles, the amount of data we can shuffle effectively (before workers run out of memory, or the scheduler crashes or bottlenecks) is severely limited. Allowing workers to autonomously exchange data with their peers and manage disk and memory usage in a more granular way allows us to push that limit significantly higher.
See https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/ for more background.
This issue tracks the current implementation progress and highlights various milestones. We intend to update the top-level description of this issue continuously such that this issue can serve as an always up-to-date overview of the current efforts.
Goals
Roadmap
1 - Foundations and dask.DataFrame ✅
The implementation effort so far focused on creating a stable foundation for the things to come and is deriving from the early prototype. This stage mostly focused on a consistent concurrency model that supports off-band, direct peer to peer communication between workers and integrates well with the existing task based scheduling logic.
This was developed at the example of a
DataFrame
based shuffle and we consider this now ready to use!For detailed instructions, known issues and feedback, please see https://github.com/dask/distributed/discussions/7509. We encourage all users of
dask.DataFrame
to try this and report with feedback.2 - dask.Array rechunking
The new shuffle extension is currently build to handle pandas DataFrames and is using pyarrow behind the scenes. It's architecture is built with generic types in mind and will be suited just as well for array workloads. One of the most popular many-to-many problems is the array rechunking which we will implement next using this extension.
Basic functionality is being set up in https://github.com/dask/distributed/pull/7534
This approach already provides constant time array rechunking but sometimes falls short in terms of walltime performance compared to old style task based shuffling.
3 - Misc
This next stage is not as refined as the intial ones. There are many smaller to medium sized issues that will either expand adoption of the P2P algorithm or make it run faster and smoother. This section will become more refined over time.