CNES / zcollection

Python library allowing to manipulate data split into a collection of groups stored in Zarr format.
https://zcollection.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
12 stars 3 forks source link

In the update function, partition_size arguments is not behaving as expected #6

Closed robin-cls closed 1 year ago

robin-cls commented 1 year ago

According to the documentation image

A partition_size set to 1 means that we map the update function over each partition. However, it seems not to be the case. The following example uses a local cluster to parallelize a dummy function. Without the partition_size argument, it is properly sent to 6 workers. However, when setting it to 1, it is run in sequential over only one worker :

from dask.distributed import LocalCluster
import time
import zcollection
import zcollection.tests.data
import fsspec

ds = next(zcollection.tests.data.create_test_dataset_with_fillvalue())
fs = fsspec.filesystem('memory')
cluster = LocalCluster(processes=False)
client = cluster.get_client()

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')
collection = zcollection.create_collection('time',
                                           ds,
                                           partition_handler,
                                           '/my_collection',
                                           filesystem=fs)
collection.insert(ds)

def dummy(zds):
    time.sleep(1)
    return dict()

# The computation is properly parallalized
cluster.scale(6)
client.wait_for_workers(6)
n_partitions = len(list(collection.partitions()))
n_workers = len(cluster.workers)
print(f"Number of partitions: {n_partitions}")
print(f"Number of workers: {n_workers}")
%time collection.update(dummy, depth=1)

>> Number of partitions: 6
>> Number of workers: 6
>> CPU times: user 389 ms, sys: 65.3 ms, total: 455 ms
>> Wall time: 2.06 s

# The computation is done in parallel
cluster.scale(6)
client.wait_for_workers(6)
n_partitions = len(list(collection.partitions()))
n_workers = len(cluster.workers)
print(f"Number of partitions: {n_partitions}")
print(f"Number of workers: {n_workers}")
%time collection.update(dummy, depth=1, partition_size=1)

>> Number of partitions: 6
>> Number of workers: 6
>> CPU times: user 1.29 s, sys: 159 ms, total: 1.45 s
>> Wall time: 7.11 s

It seems that the partition_size is instead used as the number of batches instead of the number of partitions in each batch : Link batch sequence

What is the intended use of the partition_size argument ?

fbriol commented 1 year ago

When inserting partitions, Dask parallelizes the writing of each partition across its workers. Additionally, the writing of variables within a partition is parallelized on the worker responsible for inserting that partition, using multiple threads. If you're using a single Dask worker, partition insertion will happen sequentially. We'll update the documentation to make this clearer.