pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
162 stars 25 forks source link

Rechunker 0.3.3 is incompatible with Dask 2022.01.1 and later #110

Closed tomwhite closed 2 years ago

tomwhite commented 2 years ago

Running the rechunker 0.3.3 unit tests with Dask 2022.2.0 gives lots of failures like the following.

It seems to have been introduced with this HLG change in Dask, released in Dask 2022.01.1.

This is a problem since we are stuck on rechunker 0.3.3 due to #92 (see also https://github.com/pystatgen/sgkit/issues/820).

________________________________________________ test_rechunk_dataset[mapper.temp.zarr-mapper.target.zarr-dask-10MB-target_chunks1-source_chunks0-shape0] ________________________________________________

tmp_path = PosixPath('/private/var/folders/cj/wyp4zgw17vj4m9qdmddvmcc80000gn/T/pytest-of-tom/pytest-1730/test_rechunk_dataset_mapper_te3'), shape = (100, 50), source_chunks = (10, 50)
target_chunks = {'a': {'x': 20, 'y': 10}, 'b': {'x': 20}}, max_mem = '10MB', executor = 'dask', target_store = <fsspec.mapping.FSMap object at 0x7fb48ae17890>
temp_store = <fsspec.mapping.FSMap object at 0x7fb48ae17d50>

    @pytest.mark.parametrize("shape", [(100, 50)])
    @pytest.mark.parametrize("source_chunks", [(10, 50)])
    @pytest.mark.parametrize(
        "target_chunks",
        [{"a": (20, 10), "b": (20,)}, {"a": {"x": 20, "y": 10}, "b": {"x": 20}}],
    )
    @pytest.mark.parametrize("max_mem", ["10MB"])
    @pytest.mark.parametrize("executor", ["dask"])
    @pytest.mark.parametrize("target_store", ["target.zarr", "mapper.target.zarr"])
    @pytest.mark.parametrize("temp_store", ["temp.zarr", "mapper.temp.zarr"])
    def test_rechunk_dataset(
        tmp_path,
        shape,
        source_chunks,
        target_chunks,
        max_mem,
        executor,
        target_store,
        temp_store,
    ):
        if target_store.startswith("mapper"):
            target_store = fsspec.get_mapper(str(tmp_path) + target_store)
            temp_store = fsspec.get_mapper(str(tmp_path) + temp_store)
        else:
            target_store = str(tmp_path / target_store)
            temp_store = str(tmp_path / temp_store)

        a = numpy.arange(numpy.prod(shape)).reshape(shape).astype("f4")
        a[-1] = numpy.nan
        ds = xarray.Dataset(
            dict(
                a=xarray.DataArray(
                    a, dims=["x", "y"], attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"}
                ),
                b=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]),
                c=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]),
            ),
            coords=dict(
                cx=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]),
                cy=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]),
            ),
            attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"},
        )
        ds = ds.chunk(chunks=dict(zip(["x", "y"], source_chunks)))
        options = dict(
            a=dict(
                compressor=zarr.Blosc(cname="zstd"),
                dtype="int32",
                scale_factor=0.1,
                _FillValue=-9999,
            )
        )
        rechunked = api.rechunk(
            ds,
            target_chunks=target_chunks,
            max_mem=max_mem,
            target_store=target_store,
            target_options=options,
            temp_store=temp_store,
>           executor=executor,
        )

tests/test_rechunk.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
rechunker/api.py:305: in rechunk
    plan = executor.prepare_plan(copy_spec)
rechunker/executors/dask.py:21: in prepare_plan
    return _copy_all(specs)
rechunker/executors/dask.py:96: in _copy_all
    stores_delayed = [_chunked_array_copy(spec) for spec in specs]
rechunker/executors/dask.py:96: in <listcomp>
    stores_delayed = [_chunked_array_copy(spec) for spec in specs]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

spec = CopySpec(read=ArrayProxy(array=dask.array<astype, shape=(100, 50), dtype=int32, chunksize=(10, 50), chunktype=numpy.nd...' (100, 50) int32>, chunks=(10, 50)), write=ArrayProxy(array=<zarr.core.Array '/a' (100, 50) int32>, chunks=(100, 50)))

    def _chunked_array_copy(spec: CopySpec) -> Delayed:
        """Chunked copy between arrays."""
        if spec.intermediate.array is None:
            target_store_delayed = _direct_array_copy(
                spec.read.array, spec.write.array, spec.read.chunks,
            )

            # fuse
            target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
            dsk_fused, _ = fuse(target_dsk)

            return Delayed(target_store_delayed.key, dsk_fused)

        else:
            # do intermediate store
            int_store_delayed = _direct_array_copy(
                spec.read.array, spec.intermediate.array, spec.read.chunks,
            )
            target_store_delayed = _direct_array_copy(
                spec.intermediate.array, spec.write.array, spec.write.chunks,
            )

            # now do some hacking to chain these together into a single graph.
            # get the two graphs as dicts
            int_dsk = dask.utils.ensure_dict(int_store_delayed.dask)
            target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)

            # find the root store key representing the read
            root_keys = []
            for key in target_dsk:
                if isinstance(key, str):
                    if key.startswith("from-zarr"):
                        root_keys.append(key)
>           assert len(root_keys) == 1
E           AssertionError

rechunker/executors/dask.py:74: AssertionError
rabernat commented 2 years ago

Thanks for reporting Tom. This feels vaguely similar to https://github.com/pangeo-forge/pangeo-forge-recipes/issues/259, which we successfully fixed.

In pangeo-forge-recipes, we have copied (and improved upon) the executor framework we are using in rechunker. So fixing this will likely involve bringing rechunker up to speed with those changes.

paigem commented 2 years ago

Chiming in to say that I am getting this same AssertionError using Dask 2022.2.1, but did not get the error using Dask 2022.1.1. I was running Rechunker version 0.3.3 just fine on Pangeo Cloud before the recent docker image update switched from Dask 2022.1.1 to 2022.2.1.

rabernat commented 2 years ago

Hopefully fixed by #112.

hammer commented 2 years ago

@tomwhite is going to track down some example code from @eric-czech to ensure this fix works for us.

paigem commented 2 years ago

PR #112 seems to have fixed my issues! Thanks a bunch @rabernat!

rabernat commented 2 years ago

Thanks @hammer! I've heard from a few folks already, and I'm feeling pretty confident that this issue is fixed. But would love as much feedback as possible. I will probably make a release early next week.

rabernat commented 2 years ago

0.5.0 has been released.