CNES / zcollection

Python library allowing to manipulate data split into a collection of groups stored in Zarr format.
https://zcollection.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
12 stars 3 forks source link

Incomplete overlaps with more than one workers #8

Closed robin-cls closed 1 year ago

robin-cls commented 1 year ago

Hi !

It seems that the update() function with depth > 0 (with overlaps) has a different behavior when the computation is parallelized over one worker, or more than one worker.

To be more precise, some partitions that should have overlap values on the left and write of the partitioning axis are missing one of the side. I will try to illustrate this using a simple test case

Test case

I am using the simple use case presented in the gallery. It creates a zcollection in memory with a monthly partitioning

### IMPORTS
from __future__ import annotations

import datetime
import pprint

import dask.distributed
import fsspec
import numpy

import zcollection
import zcollection.tests.data

### ZCOLLECTION CREATION
def create_dataset():
    """Create a dataset to record."""
    generator = zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)

ds = create_dataset()
ds.to_xarray()

fs = fsspec.filesystem('memory')

cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')

collection = zcollection.create_collection('time',
                                           ds,
                                           partition_handler,
                                           '/my_collection',
                                           filesystem=fs)

collection.insert(ds)

I am then defining a callback that simply prints the details about the partition that we are currently updating (partition_info arguments), and the extent of the dataset that is available for the update.

### UPDATE
def callback(zds, partition_info=slice(None)):
    updated_partition = zds["time"].values[partition_info[1]].astype('<M8[D]')
    print(f'Update: [{updated_partition[0]}, {updated_partition[-1]}] - Overlap extent [{zds["time"].values[0]}, {zds["time"].values[-1]}]')
    return dict()

When using only one worker in the local cluster, all the partitions have the proper dataset extent available. Some overlap are missing for the partition covering the most ancient and recent periods respectively, but this is expected. Also note that we have a the first partition twice because it is used by zcollection to infer which fields are updated by our callback.

### ONLY ONE WORKER
cluster.scale(1)
collection.update(callback, selected_variables=["time"], depth=1)
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-01-31T00:00:00.000000]
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-02-27T00:00:00.000000]
>> Update: [2000-02-03, 2000-02-27] - Overlap extent [2000-01-01T00:00:00.000000, 2000-03-31T00:00:00.000000]
>> Update: [2000-03-01, 2000-03-31] - Overlap extent [2000-02-03T00:00:00.000000, 2000-04-30T00:00:00.000000]
>> Update: [2000-04-03, 2000-04-30] - Overlap extent [2000-03-01T00:00:00.000000, 2000-05-30T00:00:00.000000]
>> Update: [2000-05-03, 2000-05-30] - Overlap extent [2000-04-03T00:00:00.000000, 2000-06-29T00:00:00.000000]
>> Update: [2000-06-02, 2000-06-29] - Overlap extent [2000-05-03T00:00:00.000000, 2000-06-29T00:00:00.000000]

However, when scaling the cluster to 2 workers, some central partitions are missing overlap data : ex. the march period should use february to april, but only uses february to march.

## 2 WORKERS
cluster.scale(2)
collection.update(callback, selected_variables=["time"], depth=1)
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-01-31T00:00:00.000000]
>> Update: [2000-04-03, 2000-04-30] - Overlap extent [2000-04-03T00:00:00.000000, 2000-05-30T00:00:00.000000] --> WRONG
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-02-27T00:00:00.000000]
>> Update: [2000-05-03, 2000-05-30] - Overlap extent [2000-04-03T00:00:00.000000, 2000-06-29T00:00:00.000000]
>> Update: [2000-02-03, 2000-02-27] - Overlap extent [2000-01-01T00:00:00.000000, 2000-03-31T00:00:00.000000] 
>> Update: [2000-06-02, 2000-06-29] - Overlap extent [2000-05-03T00:00:00.000000, 2000-06-29T00:00:00.000000]
>> Update: [2000-03-01, 2000-03-31] - Overlap extent [2000-02-03T00:00:00.000000, 2000-03-31T00:00:00.000000] --> WRONG

It feels like the entire update has been split between the workers and that they do not share their partitions. I did not test with more workers but I expect more errors the more workers there are.

zcollection_version: 2023.3.2

fbriol commented 1 year ago

Fixed in development branch.