pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.62k stars 1.08k forks source link

open_mfdataset failed to open tarfile filestream when it locates in the context of dask.distributed Client #5043

Open wqshen opened 3 years ago

wqshen commented 3 years ago

Recently, i use open_mfdataset to open a local tar.gz file of multiple netcdf files, it failed to open it and raise a distributed.scheduler.KilledWorker: Error and TypeError: cannot serialize 'ExFileObject' object .

My code is like following,

import tarfile
from dask.distributed import Client

client = Client()

tar = tarfile.open(my_multiple_netcdf_tar_gz_file)
flist = [tar.extractfile(member) for member in tar.getmembers()]

ds = xr.open_mfdataset(flist)

# This line will raise Exception
print(ds.MyNcVar.values)

# ....
# blah blah my other client calcualation codes
# ....

client.close()

In above code, the elements of variable flist will be type of ExFileObject, which can't be serialized to distributed.Client cluster and therefore will result in the failure of open_mfdataset .

The reason is xr.open_mfdataset auto convert chunks=None to {} , which will force the method xr.open_dataset to use dask.

We can see in this line of open_mfdataset ,

https://github.com/pydata/xarray/blob/37fe5441c8a2fb981f2c50b8379d7d4f8492ae19/xarray/backends/api.py#L897

    # Notes this line will force chunks=None into chunks={} and result in the involvement of dask
    open_kwargs = dict(engine=engine, chunks=chunks or {}, **kwargs)

    if parallel:
        import dask

        # wrap the open_dataset, getattr, and preprocess with delayed
        open_ = dask.delayed(open_dataset)
        getattr_ = dask.delayed(getattr)
        if preprocess is not None:
            preprocess = dask.delayed(preprocess)
    else:
        open_ = open_dataset
        getattr_ = getattr

    datasets = [open_(p, **open_kwargs) for p in paths]
    closers = [getattr_(ds, "_close") for ds in datasets]

Even if i set the chunks=None, it will be a error cause the chunks always not be None when it is passed into open_dataset .

I think maybe we can keep the chunks value and if anyone want change it, he or she can set it to {} or any other values as they want ?

    open_kwargs = dict(engine=engine, chunks=chunks, **kwargs)

Or may you have a better solution for my problem ?

Also, Thank You for your great jobs on this excellent package.

max-sixty commented 3 years ago

It seems reasonable to allow open_mfdataset to work without dask — do others who know better agree?

If so, what's the best approach? We could have a default of chunks={}, accepting the python footgun, and then people can pass chunks=None for no dask? Or use a sentinel value.