pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

Assembling lots of very small chunks #147

Closed max-sixty closed 11 months ago

max-sixty commented 11 months ago

I have a process that writes out data in very small chunks, using xr.Dataset.to_zarr(region=...). I then want to assemble the chunks into more reasonably sized chunks.

An order of magnitude on the sizes — I have a dataset of ~14TB, and the chunks are ~1MB, so we have about 14M chunks — lots of chunks...

Rechunker seems to stall in this case — it stops making progress. When I scale the problem down a bit, to 10% or even 5% of the size, it still stalls. (When I say "rechunker stalls" — it seems like the dask scheduler is predominantly responsible — it gives its usual warnings about the event loop taking [10,30,50]s with an operation — though it often gives those warnings at other times and still manages to succeed, whereas rechunker seems to make no progress).

Is this expected? Is this predominantly a dask problem, because the scheduler just can't handled task definitions with that many edges?


One approach that seems to work — but also seems so homebrew that I'm surely missing something — is to make each new chunk's work opaque to the dask scheduler:

def collect_and_write_chunks(path, region):
    original = xr.open_zarr(original_path).reset_encoding()
    to_write = (
        # This stops it using the main dask scheduler
        original.drop_vars(original.indexes).isel(**region).compute(scheduler="threads")
    )
    to_write.to_zarr(path, region=region)

FWIW I have plenty of memory — it would be fine to just load it all into distributed memory and then write it out again. Just running da.chunk(x=100).to_zarr(path) seems to do better than rechunker, but much worse than the "opaque task" approach above.

dcherian commented 11 months ago

da.chunk(x=100).to_zarr(path)

You could xr.open_zarr(..., chunks=FINAL_CHUNKS).to_zarr() This is a blockwise operation and should work well.

EDIT: I assumed that FINAL_CHUNKS are a multiple of the existing SMALL_CHUNKS.

max-sixty commented 11 months ago

You could xr.open_zarr(..., chunks=FINAL_CHUNKS).to_zarr() This is a blockwise operation and should work well.

Ah awesome, I indeed didn't know about this. That's excellent, thanks.

max-sixty commented 11 months ago

OK so this + rechunker is great — source=xr.open_zarr(..., chunks=INTERMEDIATE_CHUNKS), where INTERMEDIATE_CHUNKS are any multiples of the base chunks.