zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.47k stars 273 forks source link

ProcessSynchronizer not write safe #857

Open oftfrfbf opened 2 years ago

oftfrfbf commented 2 years ago

Minimal, reproducible code sample:

import zarr; import numpy as np; import os
from multiprocessing import Process
import numcodecs; numcodecs.blosc.use_threads = False

synchronizer = zarr.sync.ProcessSynchronizer('foo.sync')
z = zarr.open_array(
    store='example.zarr',
    mode='w',
    shape=(12),
    chunks=(4),
    fill_value=0,
    compressor=None,
    dtype='i4',
    synchronizer=synchronizer
)
z[...]

def write_across_chunks(indices):
    np.random.seed()
    value = np.random.randint(0, 100)
    zarrMultiprocessor = zarr.open_array('example.zarr', mode='r+', synchronizer=synchronizer)
    zarrMultiprocessor[indices[0]:indices[1]] = value # write random number across 4 elements and 2 chunks
    print(f'ProcessID: {os.getpid()}, Indices: {indices}, Value: "{value}", Zarr: [{zarrMultiprocessor[...]}]')

# simulate a number of processes, each writing across two chunks at a time
overlap_1_2 = [2, 6]
overlap_2_3 = [6, 10]
indices = [
    overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
    overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
    overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
    overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
    overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
]

processes = []
for index in indices:
    process = Process(target=write_across_chunks, args=[index])
    processes.append(process)
    process.start()

for i, process in enumerate(processes):
    process.join()

Output:

ProcessID: 42649, Indices: [2:6], Value: "27", Zarr: [[ 0  0 27 27 27 27  0  0  0  0  0  0]]
ProcessID: 42652, Indices: [6:10], Value: "51", Zarr: [[ 0  0 27 27 27 27 51 51 51 51  0  0]]
ProcessID: 42653, Indices: [2:6], Value: "84", Zarr: [[ 0  0 84 84 84 84 51 51 51 51  0  0]]
ProcessID: 42654, Indices: [6:10], Value: "8", Zarr: [[ 0  0 84 84 84 84 61 61  8  8  0  0]] # problem!
ProcessID: 42656, Indices: [6:10], Value: "61", Zarr: [[ 0  0 10 10 84 84 61 61 61 61  0  0]] # problem!
ProcessID: 42657, Indices: [2:6], Value: "10", Zarr: [[ 0  0 10 10 10 10 61 61 61 61  0  0]]
ProcessID: 42658, Indices: [6:10], Value: "58", Zarr: [[ 0  0 10 10 10 10 58 58 58 58  0  0]]
ProcessID: 42655, Indices: [2:6], Value: "21", Zarr: [[ 0  0 21 21 21 21 58 58 58 58  0  0]]
ProcessID: 42662, Indices: [6:10], Value: "37", Zarr: [[ 0  0 77 77 21 21 37 37 37 37  0  0]] # problem!
ProcessID: 42664, Indices: [6:10], Value: "93", Zarr: [[ 0  0  7  7 21 21  7  7 93 93  0  0]] # problem!
ProcessID: 42650, Indices: [6:10], Value: "7", Zarr: [[ 0  0 52 52 21 21  7  7  7  7  0  0]] # problem!
ProcessID: 42651, Indices: [2:6], Value: "52", Zarr: [[ 0  0 52 52 52 52  7  7  7  7  0  0]]
ProcessID: 42659, Indices: [2:6], Value: "7", Zarr: [[ 0  0 18 18  7  7  7  7  7  7  0  0]] # problem!
ProcessID: 42668, Indices: [6:10], Value: "81", Zarr: [[ 0  0 18 18  7  7 81 81 81 81  0  0]] # problem!
ProcessID: 42669, Indices: [2:6], Value: "90", Zarr: [[ 0  0 90 90 90 90 81 81 81 81  0  0]]
ProcessID: 42667, Indices: [2:6], Value: "61", Zarr: [[ 0  0 61 61 61 61 84 84 84 84  0  0]]
ProcessID: 42660, Indices: [6:10], Value: "84", Zarr: [[ 0  0 61 61 61 61 84 84 84 84  0  0]]
ProcessID: 42661, Indices: [2:6], Value: "77", Zarr: [[ 0  0 61 61 77 77 84 84 84 84  0  0]] # problem!
ProcessID: 42673, Indices: [2:6], Value: "3", Zarr: [[ 0  0  3  3  3  3 84 84 84 84  0  0]]
ProcessID: 42674, Indices: [6:10], Value: "51", Zarr: [[ 0  0 89 89  3  3  1  1 51 51  0  0]] # problem!
ProcessID: 42672, Indices: [6:10], Value: "1", Zarr: [[ 0  0 89 89  3  3  1  1  1  1  0  0]] # problem!
ProcessID: 42663, Indices: [2:6], Value: "18", Zarr: [[ 0  0 89 89 18 18  1  1  1  1  0  0]] # problem!
ProcessID: 42675, Indices: [2:6], Value: "74", Zarr: [[ 0  0 74 74 74 74  1  1  1  1  0  0]]
ProcessID: 42665, Indices: [2:6], Value: "89", Zarr: [[ 0  0 74 74 89 89  1  1  1  1  0  0]] # problem!
ProcessID: 42671, Indices: [2:6], Value: "86", Zarr: [[ 0  0 86 86 86 86  1  1  1  1  0  0]]
ProcessID: 42677, Indices: [2:6], Value: "6", Zarr: [[ 0  0  6  6  6  6 33 33 33 33  0  0]]
ProcessID: 42676, Indices: [6:10], Value: "33", Zarr: [[ 0  0  6  6  6  6 33 33 33 33  0  0]]
ProcessID: 42678, Indices: [6:10], Value: "12", Zarr: [[ 0  0  6  6  6  6 12 12 12 12  0  0]]
ProcessID: 42666, Indices: [6:10], Value: "98", Zarr: [[ 0  0  6  6  6  6 98 98 98 98  0  0]]
ProcessID: 42670, Indices: [6:10], Value: "12", Zarr: [[ 0  0  6  6  6  6 12 12 12 12  0  0]]

Problem description:

I am prototyping for some larger data processing that will happen in the cloud and have been testing Zarr's parallel computing and synchronization capabilities on my local machine. The synchronization tools are not process-safe as I would expect.

Below is a toy example that creates a Zarr-store utilizing a ProcessSynchronizer to coordinate read and write protection when multiple processes access the same chunk in the store. The code initializes a store with 12 elements using a chunk-size of 4 for a total of 3 chunks. I spawn a number of processes where each writes across two chunks at a time. My expectation is that each of the print statements above would maintain a consistent set of numbers across chunks, e.g.:

Initially store is set to all zeros:
read --> [0, 0, 0, 0] [0, 0, 0, 0] [0, 0, 0, 0]

Process A writes ProcessSynchronizer-safe 8's across chunks 1 and 2:
write --> [_, _, 8, 8] [8, 8, _, _] [_, _, _, _]

Process B writes ProcessSynchronizer-safe 9's across chunks 2 and 3:
write --> [_, _, _, _] [_, _, 9, 9] [9, 9, _, _]

Final output:
read --> [0, 0, 8, 8] [8, 8, 9, 9] [9, 9, 0, 0]

Anywhere marked "problem!" above in the output is a situation where the process-safe writes return corrupted data.

Additional notes:

Blosc threading is disabled and compression isn't defined for the store. I have seen in other issues where that was a problem so that shouldn't be affecting read/writes in my example.

I have also tested and am getting similar results for a threaded application using the zarr.ThreadSynchronizer.

Version and installation information:

oftfrfbf commented 2 years ago

I'll add that if this is an actual bug, I have some bandwidth to help contribute toward fixing it. Please let me know if I can help.

jcafhe commented 2 years ago

Hi, I'm experiencing the same issue on Windows, zarr 2.10.2.

After digging a bit, It appears that the lock mechanism is done on a per chunk basis, in Array._chunk_setitem method. Thus, the Array.__setitem__ method is not atomic. https://github.com/zarr-developers/zarr-python/blob/5c71212ccf5abe9f8a5ab7996c5ce3abbbbd61e2/zarr/core.py#L1991-L1993

Maybe it should be done before, i.e. somewhere between calling Array.__setitem__ and Array._set_selection methods. In the sample below, where one tries to mutate data over two adjacent chunks, these 2 chunks should be locked at the same time, and not one after the other, but I have no idea about the implications of this statement.

joshmoore commented 2 years ago

but I have no idea about the implications of this statement.

Ditto. Naively, I assume we'd need to look at a throughput metric. Are either of you up for opening a PR so we evaluate on a test-passing state?

cc: @alimanfoo

oftfrfbf commented 2 years ago

Are either of you up for opening a PR so we evaluate on a test-passing state?

@joshmoore I can work on this. I just need to think of a more pragmatic example for a unit test, the code I shared above is too stochastic to be useful.

oftfrfbf commented 2 years ago

Maybe this is acceptable for a test?

I create a matrix that is of shape (10, 10) with chunks that are (2, 2) and initialize it with zeros.

I then create ten parallel processes, each writes a repeating set of numbers diagonally down the upper triangle of the matrix. The expected output should look like this:

[[10,  9,  8,  7,  6,  5,  4,  3,  2,  1],
 [ 0, 10,  9,  8,  7,  6,  5,  4,  3,  2],
 [ 0,  0, 10,  9,  8,  7,  6,  5,  4,  3],
 [ 0,  0,  0, 10,  9,  8,  7,  6,  5,  4],
 [ 0,  0,  0,  0, 10,  9,  8,  7,  6,  5],
 [ 0,  0,  0,  0,  0, 10,  9,  8,  7,  6],
 [ 0,  0,  0,  0,  0,  0, 10,  9,  8,  7],
 [ 0,  0,  0,  0,  0,  0,  0, 10,  9,  8],
 [ 0,  0,  0,  0,  0,  0,  0,  0, 10,  9],
 [ 0,  0,  0,  0,  0,  0,  0,  0,  0, 10]]

As processes write to the store, reads show inconsistencies across chunks where numbers might not yet be fully populated.

A unit test would assert that matrix diagonals are always uniform. Examples of failed assertions:

[[10  9  0  7  6  5  4  3  2  1] # 5's and 9's are out of sync!
 [ 0 10  9  0  7  6  5  4  3  2]
 [ 0  0 10  0  0  7  6  0  4  3]
 [ 0  0  0 10  0  0  7  6  0  4]
 [ 0  0  0  0 10  0  0  7  6  0]
 [ 0  0  0  0  0 10  0  0  7  6]
 [ 0  0  0  0  0  0 10  0  0  7]
 [ 0  0  0  0  0  0  0 10  0  0]
 [ 0  0  0  0  0  0  0  0 10  0]
 [ 0  0  0  0  0  0  0  0  0 10]]

[[10  9  8  7  6  5  4  3  2  1] # 5's, 8's, and 9's are out of sync!!
 [ 0 10  9  8  7  6  5  4  3  2]
 [ 0  0 10  0  0  7  6  0  4  3]
 [ 0  0  0 10  9  0  7  6  0  4]
 [ 0  0  0  0 10  0  0  7  6  0]
 [ 0  0  0  0  0 10  0  0  7  6]
 [ 0  0  0  0  0  0 10  0  0  7]
 [ 0  0  0  0  0  0  0 10  0  0]
 [ 0  0  0  0  0  0  0  0 10  0]
 [ 0  0  0  0  0  0  0  0  0 10]]

What isn't fully intuitive to me yet is that even though there is a bit of a delay, writes across chunks seem to always eventually be correct. So refining my thinking, it might be more appropriate to say that "reading from a Zarr-store is not guaranteed to be process synchronized."

Example Code:

import zarr; import numpy as np; import os
from multiprocess import Process
import numcodecs; numcodecs.blosc.use_threads = False

synchronizer = zarr.sync.ProcessSynchronizer('foo.sync')
z = zarr.open_array(
    store='foo.zarr', mode='w',
    shape=(10, 10), chunks=(2, 2), fill_value=0,
    compressor=None, dtype='i4', synchronizer=synchronizer
)
np.fliplr(z[...])

def write_across_chunks(i):
    zarrMultiprocess = zarr.open_array('foo.zarr', mode='r+', synchronizer=synchronizer)
    zarrMultiprocess.set_coordinate_selection((list(range(i)), list(reversed(range(i)))), [i] * i)
    mm = np.fliplr(zarrMultiprocess[:])
    if not np.all([np.all(mm.diagonal(offset=k)[0] == mm.diagonal(offset=k)) for k in range(mm.shape[1])]):
        print(mm) # TODO: Assertion would fail here.

indices = list(range(1, 11))
processes = []
for index in indices:
    process = Process(target=write_across_chunks, args=[index])
    processes.append(process)
    process.start()

print(f"Processes: {len(processes)}")
for i, process in enumerate(processes):
    process.join()

np.fliplr(z)

Any thoughts on this?

joshmoore commented 2 years ago

Maybe this is acceptable for a test?

If it's showing any failing behavior, then yeah, it would make sense to get that into a PR so we can start trying to pin point the issue.

writes across chunks seem to always eventually be correct. (emphasis added)

In my mind, this would make sense since that matches the (general) expectations of object storage.

oftfrfbf commented 2 years ago

Thanks @joshmoore. I'll work on forking the repo and adding the test.

chris-allan commented 2 years ago

I think I've managed to convince myself that @jcafhe is 100% correct. That is:

Thus, the Array.setitem method is not atomic.

There is probably room for the documentation to be more explicit but I believe this is consistent with what is stated there, specifically:

Zarr provides support for chunk-level synchronization.

A value assignment that spans two chunks needs support for more than chunk-level synchronization: it needs multiple-chunk synchronization and Zarr currently has no support for this. The synchronizer interface currently only allows for the retrieval of chunk scoped locks and looking at all the synchronizer uses in zarr.core there are none that acquire multiple. I can certainly see substantial simplicity advantages to this approach.

Consequently, I'd currently posit that having Zarr assert the concurrency guarantees that I understand you desire would require substantial refactoring in order to achieve. Unless you really need this functionality and are able to drive an implementation it is likely going to be substantially easier to observe chunk boundaries algorithmically in your code that uses Zarr.

PS: If it's still a medium or long term goal that "...larger data processing that will happen in the cloud..." and the desired backend is object storage the ProcessSynchronizer is not going to help you assert even chunk-level synchronization. It depends on a mutually accessible filesystem to provide locking.

oftfrfbf commented 2 years ago

PS: If it's still a medium or long term goal that "...larger data processing that will happen in the cloud..." and the desired backend is object storage the ProcessSynchronizer is not going to help you assert even chunk-level synchronization. It depends on a mutually accessible filesystem to provide locking.

An Amazon Elastic File System can be shared between Lambdas within a Virtual Private Cloud. The attached image gives a rough overview showing the synchronizer accessible to all lambdas (unless I am misunderstanding something):

aws efs zarr

Unless you really need this functionality and are able to drive an implementation it is likely going to be substantially easier to observe chunk boundaries algorithmically in your code that uses Zarr.

Yes, I am thinking that from the toy code I put together and the documentation shared above it is making more sense for me as a user to orchestrate writes to the store by some means additional to the synchronizer.

chris-allan commented 2 years ago

An Amazon Elastic File System can be shared between Lambdas within a Virtual Private Cloud.

Yes it can, however that just feels like using a sledgehammer to kill a fly. Obviously, I'm not intimately aware of your use case but just be really careful with the semantics of EFS and Lambda if you decide to go that route; additional Lambda cold start times for mounting EFS, performance of EFS with small amounts of data, EFS lock quotas, etc.

If it were me, I'd pick one of the myriad of Redis (there is a fully managed Redis in AWS product if you don't want to run your own) based distributed lock implementations which have very well defined semantics and implement a synchronizer that was tailored to my use case.

jcafhe commented 2 years ago

Hi,

sorry for the late reply. In hindsight, I agree that the curent behaviour is consistent with what is described in the documentation:

Zarr arrays have not been designed for situations where multiple readers and writers are concurrently operating on the same array.

Also, after some refactoring of my code, I don't use concurrent writes anymore so right now, I'm ok with the current behavior of zarr.

A value assignment that spans two chunks needs support for more than chunk-level synchronization: it needs multiple-chunk synchronization and Zarr currently has no support for this. The synchronizer interface currently only allows for the retrieval of chunk scoped locks and looking at all the synchronizer uses in zarr.core there are none that acquire multiple. I can certainly see substantial simplicity advantages to this approach.

Some times ago, I tried to monkey patch the core.Array._set_selection method as a proof of concept, and it worked for this specific use case. Nevertheless, I fully agree with @chris-allan that it should be handle by the user, as it seems to raise a number of new concurrency problems which should be solved as well.

Basically, what I've tried is just about getting all the chunk keys involved in the current set operation, and acquiring all the locks at the "same time". It's a bit tricky when using lock as a context manager, because entering an unknown number of contexts in the same scope requires a bit of a hack (cf. contextlib.ExitStack https://stackoverflow.com/a/3025119). It should look like this:

    def _set_selection(self, indexer, value, fields=None):
        ...

        # iterate over chunks in range
        if not hasattr(self.store, "setitems") or self._synchronizer is not None \
           or any(map(lambda x: x == 0, self.shape)):

           # get all chunk keys involved in the set operation
            chunk_keys = []
            if self._synchronizer is not None:
                chunk_keys = [ self._chunk_key(chunk_coords) for (chunk_coords, *_) in indexer]

            # acquire the locks by entering multiple contexts
            with contextlib.ExitStack() as stacks:
                for ckey in chunk_keys:
                    stacks.enter_context(self._synchronizer[ckey])

                # iterative approach
                for chunk_coords, chunk_selection, out_selection in indexer:
                    # extract data to store (chunk_value)
                    ...

                    # put data with no sync
                    self._chunk_setitem_nosync(chunk_coords, chunk_selection, chunk_value,
                                               fields=fields)

I must say that this subject is far beyond me, but here are my thoughts about that. I suspect this approch is too naive, and some deadlocks could occur because all the locks cannot be acquired at the exact same time. E.g., we could imagine two threads/process A and B trying to write into two different slices but involving the same two adjacent chunks, chunk 1 and 2, with their respective locks L1 and L2. If the order of lock acquisition is random, we could end up with the following timeline and a possible deadlock:

 |    
 |          *process A*                   *process B*
 |    
 |    write on chunks 1, 2 
 |      |                           write on chunks 1, 2
 |      ├ acquire L1                  |
 |      |                             ├ acquire L2 before L1 for whatever reason
 |      ├ waiting for L2              |
 |                                    ├ waiting for L1
 |  
time
 | 
 V

So IMO, the order of acquisition of the chunk locks should be at least consistent across all thread/process, or maybe it would be necessary to control the acces of these locks by a global lock, which leads to more and more complexity and edge cases.

chris-allan commented 2 years ago

Thanks, @jcafhe.

Certainly the general consensus of how to deal with deadlock for multiple lock acquisition scenarios like the one you mention is to formalize the lock acquisition order. However, the fact that the ProcessSynchronizer locks are all file based adds a series of other complexities. Some of which are operating system or filesystem (local or network) specific.

I'm certainly not saying these problems cannot be addressed but rather wholeheartedly agree with your statement: "...leads to more and more complexity...". Concurrency in a distributed system is a hard problem and the complexity it adds to software attempting to address concurrent programming use cases is significant.

ljstrnadiii commented 2 years ago

If it were me, I'd pick one of the myriad of Redis (there is a fully managed Redis in AWS product if you don't want to run your own) based distributed lock implementations which have very well defined semantics and implement a synchronizer that was tailored to my use case.

I am just getting start with zarr, xarray and dask on kubernetes and have been considering using the redlock implemented in pottery. Just chiming in since this seems kind of critical for anyone writing to zarr in dask in a distributed setting. I know it is probably not ideal to use pvc for locking.

Something like this:

from pottery import RedLock

class RedLockSynchronizer(object):
    """Provides synchronization using Redlock"""

    def __init__(self, masters, auto_release_time):
        self.masters = masters
        self.auto_release_time = auto_release_time

    def __getitem__(self, item):
        lock = Redlock(
            key=item,
            auto_release_time=self.auto_release_time,
            masters = self.masters
        )
        return lock

and I am simply setting up redis with helm:

helm repo add dandydev https://dandydeveloper.github.io/charts
helm install redis dandydev/redis-ha

Any immediate concerns with this approach @chris-allan ?

rabernat commented 2 years ago

If you are using a Dask distributed cluster, you can use a distributed Lock to solve this problem. Communication between processes is handled by the Dask scheduler.

In the Pangeo Forge project, we use this to manage distributed potentially overlapping writes to Zarr stores in object storage: https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/utils.py#L86

Our implementation is somewhat specific to our use case, but it could easily be generalized.