pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

Algorithm for multi-stage rechunking #89

Closed shoyer closed 1 year ago

shoyer commented 3 years ago

Background

Rechunker's current "Push / Pull Consolidated" algorithm can be thought of as a mix of "split" and "combine" steps:

This is pretty clever, but can run into scalability issues. In particular, sometimes int_chunks must be very small, which results in a significant overhead for reading/writing small files (https://github.com/pangeo-data/rechunker/issues/80).

Proposal

I think the right way to fix this is to extend Rechunker's algorithm to allow for multiple split/combine stages -- as many as necessary, to avoid creating tiny intermediate chunk sizes. This PR implements the math for such as algorithm, in a fully backwards compatible fashion. Users can control the number of stages via the new min_mem parameter, which specifies a minimum chunk size in bytes.

Multi-stage rechunking is not yet hooked up to any of Rechunker's executors. I'm proposing adding it to rechunking because it reuses/modifies the existing code, and I need the math for rechunking inside Xarray-Beam (this was an easier way to explore writing the Beam executor part). Unfortunately it isn't easy to add into Rechunker's existing model, which stores intermediate results in Zarr, because multi-stage rechunking (at least this version) requires irregular chunks.

Dask also does multi-stage rechunking, which brought significant efficiency gains (https://github.com/dask/dask/issues/417). I considered copying Dask's rechunk planning algorithm here, but it involves a lot of complex logic so I decided to try replacing it with a simple heuristic instead. See algorithm.py for details.

Example

My specific motivation for this PR is experimenting with rechunking on Pangeo's ERA5 single-level dataset, which contains a number of 1.5 TB variables (at least once decoded into float32). Rechunking these arrays with shape=(350640, 721, 1440) from "whole image" chunks (31, 721, 1440) to "whole time-series" chunks (350640, 10, 10) with Rechunker's current algorithm produces a very large number of small chunks. It works, but seems much slower than it should be.

I wrote a little Python script to compare Rechunker's current method (min_mem=0), with my proposed multi-stage method (min_mem=10MB) and Dask's rechunking method:

```python from rechunker import algorithm import numpy as np import sys import math from rechunker.compat import prod def evaluate_stage_v2(shape, read_chunks, int_chunks, write_chunks): tasks = algorithm.calculate_single_stage_io_ops(shape, read_chunks, write_chunks) read_tasks = tasks if write_chunks != read_chunks else 0 write_tasks = tasks if read_chunks != int_chunks else 0 return read_tasks, write_tasks def evaluate_plan(stages, shape, itemsize): total_reads = 0 total_writes = 0 for i, stage in enumerate(stages): read_chunks, int_chunks, write_chunks = stage read_tasks, write_tasks = evaluate_stage_v2( shape, read_chunks, int_chunks, write_chunks, ) total_reads += read_tasks total_writes += write_tasks return total_reads, total_writes def print_summary(stages, shape, itemsize): for i, stage in enumerate(stages): print(f"stage={i}: " + " -> ".join(map(str, stage))) read_chunks, int_chunks, write_chunks = stage read_tasks, write_tasks = evaluate_stage_v2( shape, read_chunks, int_chunks, write_chunks, ) print(f" Tasks: {read_tasks} reads, {write_tasks} writes") print(f" Split chunks: {itemsize*np.prod(int_chunks)/1e6 :1.3f} MB") total_reads, total_writes = evaluate_plan(stages, shape, itemsize) print("Overall:") print(f' Reads count: {total_reads:1.3e}') print(f' Write count: {total_writes:1.3e}') # dask.array.rechunk is the function rechunk_module = sys.modules['dask.array.rechunk'] def dask_plan(shape, source_chunks, target_chunks, threshold=None): source_expanded = rechunk_module.normalize_chunks(source_chunks, shape) target_expanded = rechunk_module.normalize_chunks(target_chunks, shape) # Note: itemsize seems to be ignored, by default stages = rechunk_module.plan_rechunk( source_expanded, target_expanded, threshold=threshold, itemsize=4, ) write_chunks = [tuple(s[0] for s in stage) for stage in stages] read_chunks = [source_chunks] + write_chunks[:-1] int_chunks = [algorithm._calculate_shared_chunks(r, w) for r, w in zip(write_chunks, read_chunks)] return list(zip(read_chunks, int_chunks, write_chunks)) def rechunker_plan(shape, source_chunks, target_chunks, **kwargs): stages = algorithm.multistage_rechunking_plan( shape, source_chunks, target_chunks, **kwargs ) return ( [(source_chunks, source_chunks, stages[0][0])] + list(stages) + [(stages[-1][-1], target_chunks, target_chunks)] ) itemsize = 4 shape = (350640, 721, 1440) source_chunks = (31, 721, 1440) target_chunks = (350640, 10, 10) print(f'Total size: {itemsize*np.prod(shape)/1e12:.3} TB') print(f'Source chunk count: {np.prod(shape)/np.prod(source_chunks):1.3e}') print(f'Target chunk count: {np.prod(shape)/np.prod(target_chunks):1.3e}') print() print("Rechunker plan (min_mem=0, max_mem=500 MB):") plan = rechunker_plan( shape, source_chunks, target_chunks, itemsize=4, min_mem=0, max_mem=int(500e6) ) print_summary(plan, shape, itemsize=4) print() print("Rechunker plan (min_mem=10 MB, max_mem=500 MB):") plan = rechunker_plan( shape, source_chunks, target_chunks, itemsize=4, min_mem=int(10e6), max_mem=int(500e6) ) print_summary(plan, shape, itemsize=4) print() print("Dask plan (default):") plan = dask_plan(shape, source_chunks, target_chunks) print_summary(plan, shape, itemsize=4) ```
Total size: 1.46 TB
Source chunk count: 1.131e+04
Target chunk count: 1.038e+04

Rechunker plan (min_mem=0, max_mem=500 MB):
stage=0: (31, 721, 1440) -> (31, 721, 1440) -> (93, 721, 1440)
  Tasks: 11311 reads, 0 writes
  Split chunks: 128.742 MB
stage=1: (93, 721, 1440) -> (93, 10, 30) -> (350640, 10, 30)
  Tasks: 13213584 reads, 13213584 writes
  Split chunks: 0.112 MB
stage=2: (350640, 10, 30) -> (350640, 10, 10) -> (350640, 10, 10)
  Tasks: 10512 reads, 10512 writes
  Split chunks: 140.256 MB
Overall:
  Reads count: 1.324e+07
  Write count: 1.322e+07

Rechunker plan (min_mem=10 MB, max_mem=500 MB):
stage=0: (31, 721, 1440) -> (31, 721, 1440) -> (93, 721, 1440)
  Tasks: 11311 reads, 0 writes
  Split chunks: 128.742 MB
stage=1: (93, 721, 1440) -> (93, 173, 396) -> (1447, 173, 396)
  Tasks: 80220 reads, 80220 writes
  Split chunks: 25.485 MB
stage=2: (1447, 173, 396) -> (1447, 41, 109) -> (22528, 41, 109)
  Tasks: 96492 reads, 96492 writes
  Split chunks: 25.867 MB
stage=3: (22528, 41, 109) -> (22528, 10, 30) -> (350640, 10, 30)
  Tasks: 86864 reads, 86864 writes
  Split chunks: 27.034 MB
stage=4: (350640, 10, 30) -> (350640, 10, 10) -> (350640, 10, 10)
  Tasks: 10512 reads, 10512 writes
  Split chunks: 140.256 MB
Overall:
  Reads count: 2.854e+05
  Write count: 2.741e+05

Dask plan (default):
stage=0: (31, 721, 1440) -> (31, 721, 1440) -> (32, 721, 1440)
  Tasks: 21915 reads, 0 writes
  Split chunks: 128.742 MB
stage=1: (32, 721, 1440) -> (32, 160, 480) -> (302, 160, 480)
  Tasks: 180705 reads, 180705 writes
  Split chunks: 9.830 MB
stage=2: (302, 160, 480) -> (302, 80, 150) -> (2922, 80, 150)
  Tasks: 153720 reads, 153720 writes
  Split chunks: 14.496 MB
stage=3: (2922, 80, 150) -> (2922, 20, 60) -> (29220, 20, 60)
  Tasks: 128760 reads, 128760 writes
  Split chunks: 14.026 MB
stage=4: (29220, 20, 60) -> (29220, 10, 10) -> (350640, 10, 10)
  Tasks: 126144 reads, 126144 writes
  Split chunks: 11.688 MB
Overall:
  Reads count: 6.112e+05
  Write count: 5.893e+05

Comparing my new multi-stage algorithm (max_mem=10MB) to Rechunker's existing algorithm (max_mem=0), ​the multi-stage pipeline does two extra dataset copies, but reduces the number of IO operations by ~50x.

Comparing my new algorithm to Dask's algorithm, the plans actually look remarkably similar. My estimates suggest that my algorithm should involve about half the number of IO operations, but Dask's plan uses slightly "nicer" chunk sizes. I have no idea which is better is practice, and note that I'm using Dask's algorithm without adjusting any of the control knobs.

I have not yet benchmarked any of these algorithms on real rechunking tasks. See below for benchmarking results from Xarray-Beam, for which it significantly improves real-world performance.

codecov[bot] commented 3 years ago

Codecov Report

Merging #89 (847766f) into master (9d12932) will increase coverage by 0.32%. The diff coverage is 97.01%.

@@            Coverage Diff             @@
##           master      #89      +/-   ##
==========================================
+ Coverage   96.02%   96.35%   +0.32%     
==========================================
  Files          11       11              
  Lines         503      548      +45     
  Branches      112      105       -7     
==========================================
+ Hits          483      528      +45     
  Misses         13       13              
  Partials        7        7              
Impacted Files Coverage Δ
rechunker/api.py 97.12% <75.00%> (-0.92%) :arrow_down:
rechunker/algorithm.py 91.22% <100.00%> (+5.51%) :arrow_up:
rechunker/compat.py 100.00% <100.00%> (+22.22%) :arrow_up:

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

shoyer commented 3 years ago

Thinking about this a little bit more, this may be a harder problem for the rechunker data model than I thought. The problem is that my algorithm does not guarantee that intermediate chunks can be written to Zarr.

For example, consider stage 1 in my proposed rechunking plan above: (93, 721, 1440) -> (93, 173, 396) -> (1447, 173, 396). 1447 is not a multiple of 93 -- they have a greatest common divisor of 1 -- so even though the typical chunk along the first axes in the intermediates would be 93, in the worst case we'd have to save a single element along that axis.

In Xarray-Beam and Dask, this is handled by producing temporary irregular chunks from the "split" step. This is fine in principle, but won't work for the current version of Rechunker, which stores intermediates in Zarr.

I see a few possible ways to resolve this:

  1. Extend Zarr to support irregular chunks
  2. Switch Rechunker to store irregular chunks in raw Zarr stores (or a filesystem) rather than as Zarr arrays
  3. Figure out another multi-stage algorithm that deals with these "regular multiple" constraints (this seems hard).
rabernat commented 3 years ago

Thanks a lot for this Stephan! It will take me a couple of days to digest this.

shoyer commented 3 years ago

I tested this in Xarray-Beam today, rechunking a single 1.3 TB float32 array from (31, 721, 1440) -> (350640, 10, 10) with max_mem=256e6. This is large enough that Rechunker doesn't do any additional consolidation of reads/writes:

Edit: you can find the example code for this over in https://github.com/google/xarray-beam/pull/14

shoyer commented 1 year ago

I am happy to rebase, but would anyone be up for reviewing this PR?

The alternative would be to fork this code into Xarray-Beam.

shoyer commented 1 year ago

@rabernat any thoughts here?

rabernat commented 1 year ago

HI Stephan! Sorry for letting this lie dormant for so long. Will definitely take a look this week. Thanks for your patience.

shoyer commented 1 year ago

HI Stephan! Sorry for letting this lie dormant for so long. Will definitely take a look this week. Thanks for your patience.

Thanks!

I just merged in "master" so this should be ready for review.

shoyer commented 1 year ago

I In particular, I am not quite comprehending the what gives rise to the issue of irregular intermediate chunks, which I believe is linked to the least-common multiple calculation.

This is a good point! The code as I wrote it was rather misleading -- the LCM logic is indeed just about counting IO ops.

Irregular chunks only become necessary because of the very simple algorithm (geometric spacing) I use for picking intermediate chunk sizes in multi-stage plans. With some cleverness, this could likely be avoided in many cases.

The current rechunker algorithm does actually support "irregular chunks" in the form of overlapping reads of source arrays. So it's certainly possible to use this limited form of irregular chunking to support arbitrary rechunks, and I suspect with effort we could make it work for multi-stage rechunks, too. The most obvious way to do this would be to simply pick some intermediate chunk size, and then run the existing rechunker algorithm recursively twice, for three total temporary Zarr arrays (or 7 or 15). This would be quite a bit less flexible than what I implemented here, though.

rabernat commented 1 year ago

Fantastic. These comments are a big help. Thanks for this contribution.