pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
162 stars 25 forks source link

Intermediate-to-target pass not respecting max_mem #115

Closed rabernat closed 2 years ago

rabernat commented 2 years ago

I started from an array of shape (175320, 721, 1440) and chunks (24, 721, 1440).

I created the following rechunking plan:

target_chunks = {
    'tp': (21915, 103, 10),
    'time': None,
    'longitude': None,
    'latitude': None,
}
​
max_mem = '12GB'
​
r = rechunk(zg, target_chunks, max_mem, target_store, temp_store=temp_store)

Which produced

r._intermediate.tp.chunks == (2880, 103, 1320)

When executing this plan, the first stage (source to intermediate) ran fine. But the second (intermediate to target) exceeded the memory limits by about 2x or more.

Then I tried just rechunking from the intermediate to the target. I expected this could happen without an additional intermediate. That's how it's supposed to work. But it didn't!

g_temp = zarr.open_group(temp_store)

target_chunks = {
    'tp': (21915, 103, 10),
}

max_mem = '12GB'
r2 = rechunk(zg_temp, target_chunks, max_mem, target_store, temp_store=None)
--------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [57], in <cell line: 13>()
      4 target_chunks = {
      5     'tp': (21915, 103, 10),
      6 #    'time': None,
      7 #    'longitude': None,
      8 #    'latitude': None,
      9 }
     11 max_mem = '12GB'
---> 13 r2 = rechunk(zg_temp, target_chunks, max_mem, target_store, temp_store=None)
     14 r2

File /srv/conda/envs/notebook/lib/python3.9/site-packages/rechunker/api.py:302, in rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, executor)
    299 if isinstance(executor, str):
    300     executor = _get_executor(executor)
--> 302 copy_spec, intermediate, target = _setup_rechunk(
    303     source=source,
    304     target_chunks=target_chunks,
    305     max_mem=max_mem,
    306     target_store=target_store,
    307     target_options=target_options,
    308     temp_store=temp_store,
    309     temp_options=temp_options,
    310 )
    311 plan = executor.prepare_plan(copy_spec)
    312 return Rechunked(executor, plan, source, intermediate, target)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/rechunker/api.py:456, in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options)
    454 copy_specs = []
    455 for array_name, array_target_chunks in target_chunks.items():
--> 456     copy_spec = _setup_array_rechunk(
    457         source[array_name],
    458         array_target_chunks,
    459         max_mem,
    460         target_group,
    461         target_options=target_options.get(array_name),
    462         temp_store_or_group=temp_group,
    463         temp_options=temp_options.get(array_name),
    464         name=array_name,
    465     )
    466     copy_specs.append(copy_spec)
    468 return copy_specs, temp_group, target_group

File /srv/conda/envs/notebook/lib/python3.9/site-packages/rechunker/api.py:564, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    561 else:
    562     # do intermediate store
    563     if temp_store_or_group is None:
--> 564         raise ValueError(
    565             "A temporary store location must be provided{}.".format(
    566                 f" (array={name})" if name else ""
    567             )
    568         )
    569     int_array = _zarr_empty(
    570         shape,
    571         temp_store_or_group,
   (...)
    575         **(temp_options or {}),
    576     )
    578 read_proxy = ArrayProxy(source_array, read_chunks)

ValueError: A temporary store location must be provided (array=tp).

This suggests there may be a bug in our memory allocation logic.

rabernat commented 2 years ago

Here is the issue at the algorithm level

from rechunker.algorithm import rechunking_plan
import dask

shape = (175320, 721, 1440)
source_chunks = (24, 721, 1440)
target_chunks = (21915, 103, 10)
itemsize = 4
max_mem = "12GB"

read_chunks, int_chunks, write_chunks = rechunking_plan(
    shape,
    source_chunks,
    target_chunks,
    itemsize,
    dask.utils.parse_bytes(max_mem),
    consolidate_reads=True,
)

print(read_chunks, int_chunks, write_chunks)

read_chunks2, int_chunks2, write_chunks2 = rechunking_plan(
    shape,
    int_chunks,
    target_chunks,
    itemsize,
    dask.utils.parse_bytes(max_mem),
    consolidate_reads=True,
)

print(read_chunks2, int_chunks2, write_chunks2)
(2880, 721, 1440) (2880, 103, 1320) (21915, 103, 1320)
(20160, 103, 1320) (20160, 103, 1320) (21915, 103, 1320)

int_chunks2 should be None.