pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.59k stars 1.08k forks source link

add option to open_mfdataset for not using dask #3386

Closed sipposip closed 2 years ago

sipposip commented 5 years ago

open_mfdataset only works with dask, whereas with open_dataset one can choose to use dask or not. It would be nice have an option (e.g. use_dask=False) to not use dask.

My special use-case is the following: I use netcdf data as input for a tensorflow/keras application. I use parallel preprocessing threads in Keras. When using dask arrays, it gets complicated because both dask and tensorflow work with threads. I do not need any processing capability of dask/xarray, I only need a lazily loaded array that I can slice, and where the slices are loaded the moment they are accessed. So my application works nice with open_dataset (without defining chunks, and thus not using dask, but the data is accessed slice by slice, so it is never loaded as a whole into memory). However, it would be nice to have the same with open_mfdataset. Right now my workaround is to use netCDF4.MFDataset . (Obviously another workaround would be to concatenate my files into one and use open_dataset) Opening each file separately with open_dataset, and then concatenating them with xr.concat does not work, as this loads the data into memory.

crusaderky commented 5 years ago

@sipposip xarray doesn't use netCDF4.MFDataset, but netCDF4.Dataset which is then wrapped by dask arrays which are then concatenated.

Opening each file separately with open_dataset, and then concatenating them with xr.concat does not work, as this loads the data into memory.

This is by design, because of the reason above. The NetCDF/HDF5 lazy loading means that data is loaded up into a numpy.ndarray on the first operation performed upon it. This includes concatenation.

I'm aware that threads within threads, threads within processes, and processes within threads cause a world of pain in the form of random deadlocks - I've been there myself. You can completely disable dask threads process-wide with

dask.config.set(scheduler="synchronous")
...
ds.load()

or as a context manager

with dask.config.set(scheduler="synchronous"):
    ds.load()

or for the single operation:

ds.load(scheduler="synchronous")

Does this address your issue?

sipposip commented 5 years ago

setting dask.config.set(scheduler="synchronous") globally indeed resolved the threading issues, thanks. However, loading and preprocessing a single timeslice of data is ~40 % slower with dask and open_mfdataset (with chunks={'time':1}) compared to netCDF4.MFDataset . Is this is expected/a known issue? If not, I can try to create a minimal reproducible example.

dcherian commented 5 years ago

It would be useful to see what a single file looks like and what the combined dataset looks like. open_mfdataset can sometimes require some tuning to get good performance.

shoyer commented 5 years ago

netCDF4.MFDataset works on a much more restricted set of netCDF files than xarray.open_mfdataset. I'm not surprised it's a little bit faster, but I'm not sure it's worth the maintenance burden of supporting this separate code path. Making a fully featured version of open_mfdataset with dask would be challenging.

Can you simply add more threads in TensorFlow/Keras for loading the data? My other suggestion is to pre-shuffle the data on disk, so you don't need random access inside your training loop.

crusaderky commented 5 years ago

@sipposip if your dask graph is resolved straight after the load from disk, you can try disabling the dask optimizer to see if you can squeeze some milliseconds out of load(). You can look up the setting syntax on the dask documentation.

sipposip commented 5 years ago

@dcherian a dump of a single file:

ncdump -hs era5_mean_sea_level_pressure_2002.nc
netcdf era5_mean_sea_level_pressure_2002 {
dimensions:
    longitude = 1440 ;
    latitude = 721 ;
    time = 8760 ;
variables:
    float longitude(longitude) ;
        longitude:units = "degrees_east" ;
        longitude:long_name = "longitude" ;
    float latitude(latitude) ;
        latitude:units = "degrees_north" ;
        latitude:long_name = "latitude" ;
    int time(time) ;
        time:units = "hours since 1900-01-01 00:00:00.0" ;
        time:long_name = "time" ;
        time:calendar = "gregorian" ;
    short msl(time, latitude, longitude) ;
        msl:scale_factor = 0.23025422306319 ;
        msl:add_offset = 99003.8223728885 ;
        msl:_FillValue = -32767s ;
        msl:missing_value = -32767s ;
        msl:units = "Pa" ;
        msl:long_name = "Mean sea level pressure" ;
        msl:standard_name = "air_pressure_at_mean_sea_level" ;

// global attributes:
        :Conventions = "CF-1.6" ;
        :history = "2019-10-03 16:05:54 GMT by grib_to_netcdf-2.10.0: /opt/ecmwf/eccodes/bin/grib_to_netcdf -o /cache/data5/adaptor.mars.internal-1570117777.9045198-23871-11-c8564b6f-4db5-48d8-beab-ba9fef91d4e8.nc /cache/tmp/c8564b6f-4db5-48d8-beab-ba9fef91d4e8-adaptor.mars.internal-1570117777.905033-23871-3-tmp.grib" ;
        :_Format = "64-bit offset" ;
}

@shoyer : thanks for the tip, I think that it indeed simply adding more data-loading threads is the best solution.