funkelab / gunpowder

A library to facilitate machine learning on multi-dimensional images.
https://funkelab.github.io/gunpowder/
MIT License
79 stars 56 forks source link

ZarrWrite upstream of Scan with multiple workers leads to empty patches in written array #159

Open bentaculum opened 3 years ago

bentaculum commented 3 years ago

Below is a minimal example of what I'd like to do. Note that the request size (in voxels) is not a power of 2. The chunk size for the new array is chosen automatically, in this example it is set to (64, 128, 128) .

Presumably due to the leading dimension of the chunk size being smaller than its counterpart in the request Presumably as the chunk size does not divide the request size, the array on disk has some empty patches from conflicting concurrent write operations on the same chunk.

Can we simply expose the chunk size of a new array in ZarrWrite to avoid this?

import datetime
import numpy as np
import gunpowder as gp

timestamp = datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S")

voxel_size = gp.Coordinate((5,) * 3)
input_size = gp.Coordinate((110,) * 3) * voxel_size

raw = gp. ArrayKey('RAW')

request = gp.BatchRequest()
request.add(raw, input_size)

class Source(gp.BatchProvider):
    def __init__(self, voxel_size, array_key):
        self.shape = gp.Coordinate((512,) * 3) * voxel_size
        self.roi = gp.Roi((0, 0, 0), self.shape)
        self.array_key = array_key

        self.dtype = np.uint8
        data = np.random.randint(0, 256, (512,) * 3, dtype=self.dtype)

        self.manual_spec = gp.ArraySpec(
            roi=self.roi,
            voxel_size=voxel_size,
            dtype=data.dtype,
            interpolatable=True,
        )

        self.array = gp.Array(data, self.manual_spec)

    def setup(self):
        self.provides(self.array_key, self.manual_spec)

    def provide(self, request):
        out = gp.Batch()
        out[self.array_key] = self.array.crop(request[self.array_key].roi)
        return out

pipeline = (
    Source(voxel_size, raw)
    + gp.ZarrWrite(
        dataset_names={raw: 'raw'},
        output_filename=f"{timestamp}.zarr",
    )
    + gp.Scan(reference=request, num_workers=5)
)

with gp.build(pipeline) as p:
    p.request_batch(gp.BatchRequest())
funkey commented 3 years ago

You are right, the "empty" or corrupted chunks are a result of non-chunk-aligned parallel writes by the workers spawned by Scan. This is only a problem if there are multiple workers, though.

What we could do is to issue a warning whenever ZarrWrite detects non-aligned writes. What do you think?

bentaculum commented 3 years ago

I agree, a warning would be helpful.

Zarr also supports multi-process synchronized writes as an experimental feature, so we might think about introducing it to ZarrWrite. I made a working quick-and-dirty synchronized version for now.