zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.52k stars 283 forks source link

parallel append broken with ProcessSynchronizer #2077

Open jasonkena opened 3 months ago

jasonkena commented 3 months ago

Zarr version

v2.18.2

Numcodecs version

v0.13.0

Python Version

3.10.11

Operating System

Linux

Installation

pip

Description

Appending to zarr arrays is not safe, even with ProcessSynchronizer.

Steps to reproduce

Code:

import numpy as np
import zarr
from zarr.sync import ProcessSynchronizer, ThreadSynchronizer
from joblib import Parallel, delayed

def func(arr, chunk_size):
    temp = np.random.rand(chunk_size)
    arr.append(temp)
    return np.sum(temp)

def test(backend, synchronizer):
    chunk_size = 10
    num_chunks = 1000

    arr = zarr.open(
        "test.zarr",
        mode="w",
        shape=0,
        chunks=chunk_size,
        dtype="f8",
        synchronizer=synchronizer,
    )

    # Parallel execution
    res = Parallel(n_jobs=4, backend=backend)(
        delayed(func)(arr, chunk_size) for _ in range(num_chunks)
    )
    print(f"before reload; res_sum: {np.sum(res)}, arr_sum: {np.sum(arr[:])}")
    arr = zarr.open("test.zarr", mode="r")
    print(f"after reload; res_sum: {np.sum(res)}, arr_sum: {np.sum(arr[:])}")

for backend in ["loky", "multiprocessing", "threading"]:
    for synchronizer in [None, ProcessSynchronizer("zarr.sync"), ThreadSynchronizer()]:
        print(f"backend: {backend}, synchronizer: {synchronizer}")
        test(backend, synchronizer)
        print()

Output:

backend: loky, synchronizer: None
before reload; res_sum: 5020.318571666605, arr_sum: 0.0
after reload; res_sum: 5020.318571666605, arr_sum: 1770.3578683552682

backend: loky, synchronizer: <zarr.sync.ProcessSynchronizer object at 0x7f937cb43820>
before reload; res_sum: 5022.703973367183, arr_sum: 0.0
after reload; res_sum: 5022.703973367183, arr_sum: 504.39059284140575

backend: loky, synchronizer: <zarr.sync.ThreadSynchronizer object at 0x7f937194c1c0>
before reload; res_sum: 4979.000243414224, arr_sum: 0.0
after reload; res_sum: 4979.000243414224, arr_sum: 1357.3665623410047

backend: multiprocessing, synchronizer: None
before reload; res_sum: 5007.585795126917, arr_sum: 0.0
after reload; res_sum: 5007.585795126917, arr_sum: 1396.7038220988911

backend: multiprocessing, synchronizer: <zarr.sync.ProcessSynchronizer object at 0x7f9369e8ea40>
before reload; res_sum: 5002.127967755543, arr_sum: 0.0
after reload; res_sum: 5002.127967755543, arr_sum: 4511.494419049417

backend: multiprocessing, synchronizer: <zarr.sync.ThreadSynchronizer object at 0x7f9369987a00>
before reload; res_sum: 4996.024382756756, arr_sum: 0.0
after reload; res_sum: 4996.024382756756, arr_sum: 1331.2231158658155

backend: threading, synchronizer: None
before reload; res_sum: 4982.832294867826, arr_sum: 4982.832294867825
after reload; res_sum: 4982.832294867826, arr_sum: 4982.832294867825

backend: threading, synchronizer: <zarr.sync.ProcessSynchronizer object at 0x7f9369e8ea40>
before reload; res_sum: 4985.707917015005, arr_sum: 4985.707917015004
after reload; res_sum: 4985.707917015005, arr_sum: 4985.707917015004

backend: threading, synchronizer: <zarr.sync.ThreadSynchronizer object at 0x7f93699fdae0>
before reload; res_sum: 5002.296828527541, arr_sum: 5002.296828527541
after reload; res_sum: 5002.296828527541, arr_sum: 5002.296828527541

Additional output

No response

jasonkena commented 3 months ago

The problem seems to be that the cached metadata is not updated after the shape is resized in another thread/process, leading to dropped rows.

I found two workarounds:

def fixed_append(arr, data, axis=0): def fixed_append_nosync(data, axis=0): arr._load_metadata_nosync() return arr._append_nosync(data, axis=axis) return arr._write_op(fixed_append_nosync, data, axis=axis)


- specifying `cache_metadata=False` to force reloading at all data accesses

Perhaps the default value for `cache_metadata` should be `False` when `synchronizer` is specified to prevent this behavior?

I believe this resolves these StackOverflow questions:
- https://stackoverflow.com/questions/61929796/parallel-appending-to-a-zarr-store-via-xarray-to-zarr-and-dask
- https://stackoverflow.com/questions/61799664/how-can-one-write-lock-a-zarr-store-during-append
jasonkena commented 3 months ago

Oddly enough, both workarounds fail when working with in-memory zarr arrays (initialized with zarr.zeros(...))

jhamman commented 3 months ago

@jasonkena - thanks for the report. Your diagnosis seems correct but I'm not sure what we want to do about it. Its quite expensive to be always reloading metadata to protect against metadata modifications by another writer.

Finally, I should note that we haven't settled on whether or not to keep the synchronizer API around for the 3.0 release (it is not currently included).

zoj613 commented 1 month ago

@jasonkena - thanks for the report. Your diagnosis seems correct but I'm not sure what we want to do about it. Its quite expensive to be always reloading metadata to protect against metadata modifications by another writer.

Finally, I should note that we haven't settled on whether or not to keep the synchronizer API around for the 3.0 release (it is not currently included).

is there any data you can share to support the bolded?

jhamman commented 1 month ago

is there any data you can share to support the bolded?

@zoj613 - not any specific data but if every chunk IO op requires first checking if the metadata has changed, you can imagine how this would be expensive. In my view, the bigger issue is actually around consistency. One of the design tradeoffs in Zarr is that by splitting the dataset into many objects/files, you can act concurrently on individual components. However, the cost of this is that the writer is required to coordinate updates among multiple writers. (you might be interested in reading the Consistency Problems with Zarr in the Arraylake documentation)