zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.53k stars 286 forks source link

Sharded array is very slow to load #1343

Closed ska-incom closed 1 month ago

ska-incom commented 1 year ago

Zarr version

'2.13.7.dev9'

Numcodecs version

0.11.0

Python Version

3.10.9

Operating System

Linux

Installation

Pip from main branch

Description

Loading data using the new ShardingStorageTransformer is very slow.

I have a dataset of 6TB which I want to sample from randomly and I've experienced speed issues which I'm guessing is from the large number of files when the chunk size is small. The new sharding transformer could potentially solve my problem but so far it is even slower.

Output from the example attached

Type: minimal. Time: 0.737 s
Type: medium. Time: 5.382 s
Type: large. Time: 47.864 s
Type: sharded. Time: 76.932 s

Steps to reproduce

import os

os.environ["ZARR_V3_EXPERIMENTAL_API"] = "1"
os.environ["ZARR_V3_SHARDING"] = "1"

import numpy as np
import zarr
from zarr._storage.v3_storage_transformers import (
    ShardingStorageTransformer,
)
from time import perf_counter
from tempfile import TemporaryDirectory

with TemporaryDirectory() as tmp_dir:
    # %%
    # Create data stores
    shape = (100, 1000, 1000)
    data = np.random.random(shape)
    store = zarr.DirectoryStoreV3(f"{tmp_dir}/example.zarr")
    zgroup = zarr.group(store, overwrite=True, zarr_version=3)

    # Small chunks
    zarr_minimal_chunks = zarr.create(
        shape, chunks=(1, 1000, 1000), path="minimal", zarr_version=3, store=store
    )
    zarr_minimal_chunks[:] = data

    # Medium chunks
    zarr_medium_chunks = zarr.create(
        shape, chunks=(10, 1000, 1000), path="medium", zarr_version=3, store=store
    )
    zarr_medium_chunks[:] = data

    # Large chunks
    zarr_large_chunks = zarr.create(
        shape, chunks=(100, 1000, 1000), path="large", zarr_version=3, store=store
    )
    zarr_large_chunks[:] = data

    # Large sharded chunks
    zarr_sharded_chunks = zarr.create(
        shape,
        chunks=(100, 1000, 1000),
        path="sharded",
        zarr_version=3,
        storage_transformers=[
            ShardingStorageTransformer(
                "indexed",
                chunks_per_shard=(100, 1, 1),
            )
        ],
        store=store,
    )
    zarr_sharded_chunks[:] = data

    # Close store to ensure data is flushed to disk? Probably unnecessary
    store.close()

    def zarr_load_steps(array: zarr.Array, name: str, num_steps: int):
        start = perf_counter()
        for i in range(num_steps):
            array[i]

        print(f"Type: {name}. Time: {perf_counter() - start:.3f} s")

    for name, f in (
        ("minimal", zarr_minimal_chunks),
        ("medium", zarr_medium_chunks),
        ("large", zarr_large_chunks),
        ("sharded", zarr_sharded_chunks),
    ):
        zarr_load_steps(f, name, 100)

Additional output

No response

rabernat commented 1 year ago

Hi @ska-incom and thanks for reporting your issue!

Just to set some expectations here, the sharded format is extremely new and experimental and has not been released yet (only available on master). No performance optimization has been done yet.

We would welcome your help on diagnosing and optimizing performance. As a first step, I always try to generate a snakeviz visualization of my code to understand where it is spending its time.

rabernat commented 1 year ago

It also might be worth comparing your results with my preliminary benchmarking. In that, I reached an opposite conclusion; access to a single element from the sharded array was very fast (but writing was very slow). There is probably something to be learned in understanding how we reached different conclusions.

ska-incom commented 1 year ago

Thanks, I'll take a look and dig a bit deeper.

rabernat commented 1 year ago

I played around a bit with your example, and I concluded that Zarr is not recognizing that your array is sharded, i.e.

zarr_sharded_chunks.chunks == (100, 1000, 1000)

That is the shard, not the chunk. Chunks should be (1, 1000, 1000) based on your configuration. You may need to re-open the array in order for it to be parsed correctly. I tried playing around with it but couldn't get it to work. But if you compare to my example, you'll see that, with my sharded array, the internal chunks match the chunks Zarr sees.

ska-incom commented 1 year ago

That is probably the issue, but I'm also getting that from your example, i.e. arr_sh.chunks == (200, 200) which should be (20, 20) given that chunks=small_chunks but maybe that was a mistake? I further discovered a small error in your test, specifically

arr_sm = zarr.create(shape, chunks=chunks, dtype=dtype, store=small_chunks_store)

where chunks=chunks should have been chunks=small_chunks. When that error is fixed loading from arr_sm is fastest which is probably due to some overhead in the sharded array.

%time _ = arr_sm[-1, -1]
# CPU times: user 60 µs, sys: 4.04 ms, total: 4.1 ms
# Wall time: 3.83 ms

%time _ = arr_sh[-1,-1]
# CPU times: user 1.15 ms, sys: 4.53 ms, total: 5.69 ms
# Wall time: 4.8 ms

If I change the chunk sizes in the sharded example to chunks=chunks, i.e. with sharding should result in (200, 200) chunks, then the performance is much worse

%time _ = arr_sh[-1,-1]
# CPU times: user 10.1 ms, sys: 37.7 ms, total: 47.8 ms
# Wall time: 36.4 ms
rabernat commented 1 year ago

where chunks=chunks should have been chunks=small_chunks

Yes that's a typo (now fixed) that crept in when I copied my code into github. In my actual benchmarks it was small_chunks.

...I'm also getting that from your example, i.e. arr_sh.chunks == (200, 200)

Yes that's actually what I see too... 🤔 However, my performance is good.

At this point I think we could use some advice from @jstriebel on the proper API to be using to access the sharded format.

rabernat commented 1 year ago

And just to clarify, I have confirm that the way I'm doing it...

sharded_store = DirectoryStoreV3("sharded.zarr")
sharding_transformer = ShardingStorageTransformer("indexed", chunks_per_shard=chunks_per_shard)
arr_sh = zarr.create(shape, chunks=small_chunks, dtype=dtype, store=sharded_store, storage_transformers=[sharding_transformer], overwrite=True)

...does indeed produce the expected 100 chunks per shard by examining the index footer:

nchunks = chunks_per_shard[0] * chunks_per_shard[1]
index_len = nchunks  * 16
with open("sharded.zarr/data/root/c0/0", mode="rb") as fp:
    fp.seek(-index_len, 2)
    index_bytes = fp.read()

index_data = np.frombuffer(index_bytes, dtype='u8').reshape(nchunks, 2)
index_data
hanslovsky commented 1 year ago

I have a similar experience exploring zarr sharding to minimize inode count and IO when reading small crops from arrays. Here is my scenario (notebook available here:

  1. 4D zarr array with shape about (N, 5, 1_000, 1_000) (N images, 5 channels, about 1 Megapixels each). Chunk size is (1, 5, 1_000, 1_000).
  2. Access random small 2D crops across all channels array[i, :, oy:oy+224, ox:ox+224]
  3. We have our data on a local HPC and would like to:
    1. Minimize IO/deserialization time for 2D crops
    2. Minimize number of inodes/files

Sharding looks like a great option to meet both those requirements. My hypothesis is that I can minimize IO if the chunk size is sufficiently small, and keep the inode count constant using sharding. I benchmarked various chunk sizes with sharding, such that each shard file is (about) as big as a chunk in the original array in (1), e.g. a chunk size of (1, 5, 200, 200) and (1, 1, 5, 5) chunks per shard. I populated the array with random data for this example. I evaluated reads of random crops of size (224, 224) for various chunk sizes and found the smaller the chunking, the worse the performance. Not using sharding is always faster (n in sharded-n is the chunk size along the last two dimensions of the sharded arrays):

>>> for arr in all_arrs:
    print(f'{arr.name}')
    %time _ = [arr[x, :, sly, slx] for x, (sly, slx) in enumerate(random_slices)]

/unsharded
CPU times: user 12 ms, sys: 27.7 ms, total: 39.8 ms
Wall time: 22.4 ms
/sharded-100
CPU times: user 9.06 ms, sys: 103 ms, total: 112 ms
Wall time: 112 ms
/sharded-200
CPU times: user 3.75 ms, sys: 51.7 ms, total: 55.5 ms
Wall time: 55.7 ms
/sharded-256
CPU times: user 5.68 ms, sys: 49.5 ms, total: 55.2 ms
Wall time: 48.7 ms
/sharded-512
CPU times: user 7.14 ms, sys: 31.5 ms, total: 38.6 ms
Wall time: 31.6 ms

This comes quite as a surprise to me and I wonder if I am using sharding incorrectly, or there may be some room for optimization (I am aware sharding is still experimental).

I have a few TODOs:

  1. Add a comparison with the same chunk sizes but without sharding.
  2. Look into the prof files I create with cProfile using snakeviz
hanslovsky commented 1 year ago

I added TODO (1): comparing the same chunk sizes without sharding. Her is what I found (output shared below):

  1. Without sharding, read time can be decreased by more than 50% with smaller chunks. When going too small, read times become slower. I figure this is a combination of chunking overhead and sub-optimal compression ratios for really small block sizes.
  2. For sharding, read time is consistently slower with smaller chunk size.
    /unsharded-1024
    CPU times: user 11.6 ms, sys: 29.8 ms, total: 41.4 ms
    Wall time: 23.9 ms
    /unsharded-512
    CPU times: user 4.97 ms, sys: 19.3 ms, total: 24.3 ms
    Wall time: 17.9 ms
    /unsharded-256
    CPU times: user 3.38 ms, sys: 11.7 ms, total: 15.1 ms
    Wall time: 9.46 ms
    /unsharded-200
    CPU times: user 2.25 ms, sys: 9.2 ms, total: 11.5 ms
    Wall time: 11.5 ms
    /unsharded-100
    CPU times: user 6.7 ms, sys: 10.1 ms, total: 16.8 ms
    Wall time: 16.8 ms
    /sharded-512
    CPU times: user 6.75 ms, sys: 30.5 ms, total: 37.2 ms
    Wall time: 30.4 ms
    /sharded-256
    CPU times: user 4.81 ms, sys: 42.4 ms, total: 47.2 ms
    Wall time: 41.3 ms
    /sharded-200
    CPU times: user 3.9 ms, sys: 45 ms, total: 48.9 ms
    Wall time: 48.8 ms
    /sharded-100
    CPU times: user 9 ms, sys: 105 ms, total: 114 ms
    Wall time: 114 ms

I am not familiar with the internals, but #1355 might be related and helpful for my case. Writing the data is a one time operation, and after that I only need to read.

jhamman commented 1 month ago

Does someone (maybe @normanrz) want to revisit this issue and decide if we can close it out?

rbavery commented 1 month ago

I gave this a try but got tripped up by an import error for

from zarr._storage.v3_storage_transformers import (
    ShardingStorageTransformer

and then couldn't find guidance in the online docs. local docs won't build after a fresh clone due to Makefile:12: *** commands commence before first target. Stop

Is this the kind of issue a first time contributor could have a go at? I'm keen to help with zarr-python v3 and understand how to make use of sharding.

normanrz commented 1 month ago

Here is the updated code for zarr-python 3.0.0b0:

import numpy as np
import zarr

from time import perf_counter
from tempfile import TemporaryDirectory

import zarr.codecs
import zarr.storage

with TemporaryDirectory() as tmp_dir:
    # %%
    # Create data stores
    shape = (100, 1000, 1000)
    data = np.random.random(shape)
    store = zarr.storage.LocalStore(tmp_dir, mode="a")

    # Small chunks
    zarr_minimal_chunks = zarr.open(
        store=store, path="minimal.zarr", shape=shape, dtype=data.dtype, chunk_shape=(1, 1000, 1000)
    )
    zarr_minimal_chunks[:] = data

    # Medium chunks
    zarr_medium_chunks = zarr.open(
        store=store, path="medium.zarr", shape=shape, dtype=data.dtype, chunk_shape=(10, 1000, 1000)
    )
    zarr_medium_chunks[:] = data

    # Large chunks
    zarr_large_chunks = zarr.open(
        store=store, path="large.zarr", shape=shape, dtype=data.dtype, chunk_shape=(100, 1000, 1000)
    )
    zarr_large_chunks[:] = data

    # Large sharded chunks
    zarr_sharded_chunks = zarr.open(
        store=store,
        path="sharded.zarr",
        shape=shape,
        chunk_shape=(100, 1000, 1000),
        dtype=data.dtype,
        codecs=[
            zarr.codecs.ShardingCodec(
                chunk_shape=(1, 1000, 1000), codecs=[zarr.codecs.BytesCodec()]
            )
        ],
    )
    zarr_sharded_chunks[:] = data

    def zarr_load_steps(array: zarr.Array, name: str, num_steps: int):
        start = perf_counter()
        for i in range(num_steps):
            array[i]

        print(f"Type: {name}. Time: {perf_counter() - start:.3f} s")

    for name, f in (
        ("minimal", zarr_minimal_chunks),
        ("medium", zarr_medium_chunks),
        ("large", zarr_large_chunks),
        ("sharded", zarr_sharded_chunks),
    ):
        zarr_load_steps(f, name, 100)

I am getting the following numbers. They seem reasonable to me.

Type: minimal. Time: 0.390 s
Type: medium. Time: 1.387 s
Type: large. Time: 12.539 s
Type: sharded. Time: 0.553 s
jhamman commented 1 month ago

👋 @rbavery!

The example above was using an ancient version of sharding... We've come a long way!

I adapted the original example:

import time
from tempfile import TemporaryDirectory
import numpy as np
import zarr
from zarr.storage.logging import LoggingStore
from zarr.codecs import ShardingCodec, TransposeCodec, BytesCodec, BloscCodec

class Timer:
    def __init__(self, hint: str):
        self.hint = hint

    def __enter__(self):
        self.start_time = time.time()  # Record the start time
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.end_time = time.time()  # Record the end time
        self.duration = self.end_time - self.start_time  # Calculate the duration
        print(f"{self.hint}: {self.duration:.4f} seconds")

array_kwargs = {
    "minimal": {'chunks': (1, 1000, 1000)},
    "medium": {'chunks': (10, 1000, 1000)},
    "large": {'chunks': (100, 1000, 1000)},
    "sharded": {
        'chunks': (100, 1000, 1000),  # same as minimal
        'codecs': [ShardingCodec(
            chunk_shape=(1, 1000, 1000),
            codecs=[BytesCodec(),],
            index_location="start",
            )],
    }
}

with TemporaryDirectory() as tmp_dir:

    store = zarr.storage.LocalStore(tmp_dir, mode='w')
    # store = LoggingStore(store)
    shape = (100, 1000, 1000)
    data = np.random.random(shape)

    zgroup = zarr.group(store, zarr_format=3)

    for path, kwargs in array_kwargs.items():

        with Timer(f"initialize {path}"):
            arr = zarr.create(
                shape, path=path, zarr_format=3, store=store, **kwargs
            )
        with Timer(f"write {path}"):
            arr[:] = data

        with Timer(f"read {path}"):
            _ = arr[:]

# initialize minimal: 0.0036 seconds
# write minimal: 0.7344 seconds
# read minimal: 0.4661 seconds
# initialize medium: 0.0011 seconds
# write medium: 0.9915 seconds
# read medium: 0.7139 seconds
# initialize large: 0.0014 seconds
# write large: 1.2540 seconds
# read large: 0.5786 seconds
# initialize sharded: 0.0045 seconds
# write sharded: 14.0472 seconds
# read sharded: 0.4677 seconds

☝️ based on this, the "slow to load" issue that opened this ticket seems to be gone (reading shards is almost exactly the same speed as reading minimal)

jhamman commented 1 month ago

lol -- @normanrz and I both couldn't help ourselves! Simultaneous posts.

normanrz commented 1 month ago

:D Good morning! I agree with your analysis. Let's close this issue.