zarr-developers / VirtualiZarr

Create virtual Zarr stores from archival data files using xarray syntax
https://virtualizarr.readthedocs.io/en/latest/
Apache License 2.0
85 stars 16 forks source link

Trying to run `open_virtual_dataset` in parallel #95

Open jbusecke opened 3 months ago

jbusecke commented 3 months ago

I am trying to build onto #93 and reduce the time that is needed to create the reference file:

My setup is the following:

import dask
from dask.diagnostics import ProgressBar
from tqdm.auto import tqdm
from virtualizarr import open_virtual_dataset
from virtualizarr.kerchunk import FileType
import xarray as xr

files = [
    's3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_185001-186012.nc',
    's3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_187101-188012.nc',
    's3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_188101-189012.nc',
]

To establish a baseline I read each file in serial:

%%time
# load virtual datasets in serial
vds_list = []
for f in files:
    vds = open_virtual_dataset(f, filetype=FileType.netcdf4, indexes={})
    vds_list.append(vds)
CPU times: user 10min 59s, sys: 757 ms, total: 11min
Wall time: 12min 14s

I then naively tried to wrap open_virtual_dataset with dask.delayed:

%%time
# load virtual datasets in parallel
open_virtual_ = dask.delayed(open_virtual_dataset)
vds_lazy_list = [open_virtual_(f, filetype=FileType.netcdf4, indexes={}) for f in files]
vds_list_parallel = dask.compute(vds_lazy_list)

But I am seeing no speedup at all:

CPU times: user 11min 33s, sys: 3.86 s, total: 11min 37s
Wall time: 12min 37s

I am not sure if there is anything wrong in the way I am setting this problem up that prevents parallelism, but would be curious to hear others opinion about this.

dcherian commented 3 months ago

multithreading does not work with HDF5/netCDF4. There's a process-level lock in the HDF5 C library so you have only serial access. Use "processes" or a process-heavy distributed cluster.

jbusecke commented 3 months ago

Ah thanks! Running that now and will report back.

jbusecke commented 3 months ago
vds_list_parallel_processes = dask.compute(vds_lazy_list, scheduler='processes')

Also looks pretty dissapointing.

CPU times: user 5.07 s, sys: 1.04 s, total: 6.11 s
Wall time: 11min 14s

Is there something on the fsspec level that locks this? Have to move on from this for now, but would try to run this with local netcdfs first to get rid of fsspec?

jbusecke commented 3 months ago

Running (EDIT: I did execute the below two cells independently and the timing only represents the latter)

import s3fs
fs = s3fs.S3FileSystem(anon=True)
local_files = [file.split('/')[-1] for file in files]
for file, local_file in zip(files, local_files):
    fs.get_file(file, local_file)
%%time
vds_lazy_list_local = [open_virtual_(f, filetype=FileType.netcdf4, indexes={}) for f in local_files]
vds_list_parallel_processes_local = dask.compute(vds_lazy_list_local, scheduler='processes')

was faster!

CPU times: user 3.59 s, sys: 690 ms, total: 4.28 s
Wall time: 8min 24s
TomNicholas commented 3 months ago

Thanks for trying this @jbusecke !

Presumably fs.get_file(file, local_file) being outside the delayed call isn't a significant factor?

jbusecke commented 3 months ago

Oh sorry this was not posted clearly, I have edited the post above.

dcherian commented 3 months ago

lol now it makes a lot more sense.