zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.45k stars 273 forks source link

Error reading sharded array past the first inner chunk #2018

Open darsnack opened 2 months ago

darsnack commented 2 months ago

Zarr version

v3.0.0a0

Numcodecs version

v0.12.1

Python Version

3.12

Operating System

Linux

Installation

Using Poetry

Description

I have an array stored using Zarr v3 in a sharded format where the inner chunk size is 1. Reading past the first chunk results in an error show below in the MWE. If the chunk size is > 1 (e.g. k), then the no errors occur for indices 0 through k - 1, but the same error occurs when accessing index k onwards.

Steps to reproduce

First, create a sharded store:

In [1]: import zarr

In [2]: import numpy as np

In [3]: store = zarr.store.LocalStore("./test.zarr", mode="w")

In [4]: from zarr.codecs import ShardingCodec

In [5]: arr = zarr.create(store=store,
   ...:                   shape=(1000, 1000),
   ...:                   chunk_shape=(20, 1000),
   ...:                   zarr_format=3,
   ...:                   codecs=[ShardingCodec(chunk_shape=(1, 1000))])

In [6]: arr[:] = np.ones((1000, 1000))

Now, attempt to open the store and read a single chunk at a time:

In [1]: import zarr

In [2]: store = zarr.store.LocalStore("./test.zarr", mode="r")

In [3]: arr = zarr.open_array(store=store, zarr_format=3)

In [4]: arr[0].shape
Out[4]: (1000,)

In [5]: arr[1].shape
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[5], line 1
----> 1 arr[1].shape

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:711, in Array.__getitem__(self, selection)
    709     return self.get_orthogonal_selection(pure_selection, fields=fields)
    710 else:
--> 711     return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:733, in Array.get_basic_selection(self, selection, out, prototype, fields)
    731     raise NotImplementedError
    732 else:
--> 733     return sync(
    734         self._async_array._get_selection(
    735             BasicIndexer(selection, self.shape, self.metadata.chunk_grid),
    736             out=out,
    737             fields=fields,
    738             prototype=prototype,
    739         )
    740     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/sync.py:92, in sync(coro, loop, timeout)
     89 return_result = next(iter(finished)).result()
     91 if isinstance(return_result, BaseException):
---> 92     raise return_result
     93 else:
     94     return return_result

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/sync.py:51, in _runner(coro)
     46 """
     47 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
     48 exception, the exception will be returned.
     49 """
     50 try:
---> 51     return await coro
     52 except Exception as ex:
     53     return ex

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:447, in AsyncArray._get_selection(self, indexer, prototype, out, fields)
    439     out_buffer = prototype.nd_buffer.create(
    440         shape=indexer.shape,
    441         dtype=out_dtype,
    442         order=self.order,
    443         fill_value=self.metadata.fill_value,
    444     )
    445 if product(indexer.shape) > 0:
    446     # reading chunks and decoding them
--> 447     await self.metadata.codec_pipeline.read(
    448         [
    449             (
    450                 self.store_path / self.metadata.encode_chunk_key(chunk_coords),
    451                 self.metadata.get_chunk_spec(chunk_coords, self.order, prototype=prototype),
    452                 chunk_selection,
    453                 out_selection,
    454             )
    455             for chunk_coords, chunk_selection, out_selection in indexer
    456         ],
    457         out_buffer,
    458         drop_axes=indexer.drop_axes,
    459     )
    460 return out_buffer.as_ndarray_like()

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:489, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
    483 async def read(
    484     self,
    485     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
    486     out: NDBuffer,
    487     drop_axes: tuple[int, ...] = (),
    488 ) -> None:
--> 489     await concurrent_map(
    490         [
    491             (single_batch_info, out, drop_axes)
    492             for single_batch_info in batched(batch_info, self.batch_size)
    493         ],
    494         self.read_batch,
    495         config.get("async.concurrency"),
    496     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:298, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
    291 async def read_batch(
    292     self,
    293     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
    294     out: NDBuffer,
    295     drop_axes: tuple[int, ...] = (),
    296 ) -> None:
    297     if self.supports_partial_decode:
--> 298         chunk_array_batch = await self.decode_partial_batch(
    299             [
    300                 (byte_getter, chunk_selection, chunk_spec)
    301                 for byte_getter, chunk_spec, chunk_selection, _ in batch_info
    302             ]
    303         )
    304         for chunk_array, (_, chunk_spec, _, out_selection) in zip(
    305             chunk_array_batch, batch_info, strict=False
    306         ):
    307             if chunk_array is not None:

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:254, in BatchedCodecPipeline.decode_partial_batch(self, batch_info)
    252 assert self.supports_partial_decode
    253 assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
--> 254 return await self.array_bytes_codec.decode_partial(batch_info)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:182, in ArrayBytesCodecPartialDecodeMixin.decode_partial(self, batch_info)
    162 async def decode_partial(
    163     self,
    164     batch_info: Iterable[tuple[ByteGetter, SelectorTuple, ArraySpec]],
    165 ) -> Iterable[NDBuffer | None]:
    166     """Partially decodes a batch of chunks.
    167     This method determines parts of a chunk from the slice selection,
    168     fetches these parts from the store (via ByteGetter) and decodes them.
   (...)
    180     Iterable[NDBuffer | None]
    181     """
--> 182     return await concurrent_map(
    183         [
    184             (byte_getter, selection, chunk_spec)
    185             for byte_getter, selection, chunk_spec in batch_info
    186         ],
    187         self._decode_partial_single,
    188         config.get("async.concurrency"),
    189     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/sharding.py:477, in ShardingCodec._decode_partial_single(self, byte_getter, selection, shard_spec)
    474                 shard_dict[chunk_coords] = chunk_bytes
    476 # decoding chunks and writing them into the output buffer
--> 477 await self.codecs.read(
    478     [
    479         (
    480             _ShardingByteGetter(shard_dict, chunk_coords),
    481             chunk_spec,
    482             chunk_selection,
    483             out_selection,
    484         )
    485         for chunk_coords, chunk_selection, out_selection in indexer
    486     ],
    487     out,
    488 )
    489 return out

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:489, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
    483 async def read(
    484     self,
    485     batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
    486     out: NDBuffer,
    487     drop_axes: tuple[int, ...] = (),
    488 ) -> None:
--> 489     await concurrent_map(
    490         [
    491             (single_batch_info, out, drop_axes)
    492             for single_batch_info in batched(batch_info, self.batch_size)
    493         ],
    494         self.read_batch,
    495         config.get("async.concurrency"),
    496     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:320, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
    311 else:
    312     chunk_bytes_batch = await concurrent_map(
    313         [
    314             (byte_getter, array_spec.prototype)
   (...)
    318         config.get("async.concurrency"),
    319     )
--> 320     chunk_array_batch = await self.decode_batch(
    321         [
    322             (chunk_bytes, chunk_spec)
    323             for chunk_bytes, (_, chunk_spec, _, _) in zip(
    324                 chunk_bytes_batch, batch_info, strict=False
    325             )
    326         ],
    327     )
    328     for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
    329         chunk_array_batch, batch_info, strict=False
    330     ):
    331         if chunk_array is not None:

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:237, in BatchedCodecPipeline.decode_batch(self, chunk_bytes_and_specs)
    232     chunk_bytes_batch = await bb_codec.decode(
    233         zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    234     )
    236 ab_codec, chunk_spec_batch = ab_codec_with_spec
--> 237 chunk_array_batch = await ab_codec.decode(
    238     zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
    239 )
    241 for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
    242     chunk_array_batch = await aa_codec.decode(
    243         zip(chunk_array_batch, chunk_spec_batch, strict=False)
    244     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:107, in _Codec.decode(self, chunks_and_specs)
     91 async def decode(
     92     self,
     93     chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
     94 ) -> Iterable[CodecInput | None]:
     95     """Decodes a batch of chunks.
     96     Chunks can be None in which case they are ignored by the codec.
     97
   (...)
    105     Iterable[CodecInput | None]
    106     """
--> 107     return await batching_helper(self._decode_single, chunks_and_specs)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:392, in batching_helper(func, batch_info)
    388 async def batching_helper(
    389     func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
    390     batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
    391 ) -> list[CodecOutput | None]:
--> 392     return await concurrent_map(
    393         [(chunk_array, chunk_spec) for chunk_array, chunk_spec in batch_info],
    394         noop_for_none(func),
    395         config.get("async.concurrency"),
    396     )

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
     49 async def concurrent_map(
     50     items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
     51 ) -> list[V]:
     52     if limit is None:
---> 53         return await asyncio.gather(*[func(*item) for item in items])
     55     else:
     56         sem = asyncio.Semaphore(limit)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:405, in noop_for_none.<locals>.wrap(chunk, chunk_spec)
    403 if chunk is None:
    404     return None
--> 405 return await func(chunk, chunk_spec)

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/bytes.py:89, in BytesCodec._decode_single(self, chunk_bytes, chunk_spec)
     87 # ensure correct chunk shape
     88 if chunk_array.shape != chunk_spec.shape:
---> 89     chunk_array = chunk_array.reshape(
     90         chunk_spec.shape,
     91     )
     92 return chunk_array

File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/buffer.py:375, in NDBuffer.reshape(self, newshape)
    374 def reshape(self, newshape: ChunkCoords | Literal[-1]) -> Self:
--> 375     return self.__class__(self._data.reshape(newshape))

ValueError: cannot reshape array of size 2000 into shape (1,1000)

If we try to access arr[2] then the error will try to reshape an array of size 3000. It seems that doing arr[i] reads chunks from 0 through i (inclusive) instead of a single chunk.

Additional output

No response

darsnack commented 2 months ago

This error persists on the v3 branch as well

d-v-b commented 2 months ago

thanks for the bug report @darsnack, I'm planning on making some improvements to the sharding test suite this week so I will hopefully get some time to replicate (and maybe fix) this bug

darsnack commented 2 months ago

After some debugging, I think I've narrowed down the cause of the issue. Here's a summary:

  1. When building up the shard dict mapping, we call shard_index.get_chunk_slice here which returns the start and end index (in bytes) of the inner chunk.
  2. A few lines down, we actually read the bytes from storage.
  3. Eventually, this will hit _get since we are dealing with a local store on disk for each shard.
  4. This logic interprets the byte_range (i.e. the chunk slice from Step 1) as a start index and total length to read. This is the error, since we specified a start and end index instead.

From this, we get the behavior described in the bug report.

Currently, I could try modifying get_chunk_slice from Step 1 or do the even more minimal change of computing the total length from the output of get_chunk_slice. I am not sure what the downstream effects of the former will be, since I am not familiar with this codebase. I'm happy to put up a PR with the bug fix, but I'll need some guidance on what fix the maintainers prefer.