pydata / xarray

N-D labeled arrays and datasets in Python
Apache License 2.0
3.56k stars 1.07k forks source link

Drop coordinates on loading large dataset. #1854

Closed jamesstidard closed 4 years ago

jamesstidard commented 6 years ago

I've been struggling for quite a while to load a large dataset so I thought it best ask as I think I'm missing a trick. I've also looked through the issues but, even though there are a fair few questions that seemed promising.

I have a number of *.nc files with variables across the coordinates latitude, longitude and time. Each file has the data for all the latitude and longitudes of the world and then some period of time - about two months.

The goal is to go through that data and get all the history of a single latitude/longitude coordinate - instead of the data for all latitude and longitude for small periods.

This is my current few lines of script:

ds = xr.open_mfdataset('path/to/ncs/*.nc', chunks={'time': 127})  # 127 is normally the size of the time dimension in each file
recs = ds.sel(latitude=10, longitude=10).to_dataframe().to_records()
np.savez('location.npz', recs)

However, this blows out the memory on my machine on the open_mfdataset call when I use the full dataset. I've tried a bunch of different ways of chunking the data (like: 'latitude': 1, 'longitude': 1) but not been able to get past this stage.

I was wondering if there's a way to either determine a good chunk size or maybe tell the open_mfdataset to only keep values from the lat/lng coordinates I care about (coords kwarg looked like it could've been it) .

I'm using version 0.10.0 of xarray

Would very much appreciate any help.

rabernat commented 6 years ago

Can you provide a bit more info about the structure of the individual files?

Open a single file and call, then paste the contents here.

jamesstidard commented 6 years ago

Hi @rabernat, thanks for the response. Sorry it's taken me a few days to get back to you.

Here's the info dump of one of the files:

xarray.Dataset {
    latitude = 361 ;
    longitude = 720 ;
    time = 248 ;

    float32 longitude(longitude) ;
        longitude:units = degrees_east ;
        longitude:long_name = longitude ;
    float32 latitude(latitude) ;
        latitude:units = degrees_north ;
        latitude:long_name = latitude ;
    datetime64[ns] time(time) ;
        time:long_name = time ;
    float64 mwd(time, latitude, longitude) ;
        mwd:units = Degree true ;
        mwd:long_name = Mean wave direction ;

// global attributes:
    :Conventions = CF-1.6 ;
    :history = 2017-08-09 18:15:34 GMT by grib_to_netcdf-2.4.0: grib_to_netcdf /data/data05/scratch/_mars-atls02-70e05f9f8ba4e9d19932f1c45a7be8d8-Pwy6jZ.grib -o /data/data01/scratch/ -utime ;
jamesstidard commented 6 years ago

Sorry to bump this. Still looking to a solution to this problem if anyone has had a similar experience. Thanks.

rabernat commented 6 years ago

I am puzzled by this. Selecting a single point should not require loading into memory the whole dataset.

Can you post the output of repr(ds.sel(latitude=10, longitude=10))?

jamesstidard commented 6 years ago

Sure, this is the repr of a single file:

Dimensions:    (time: 248)
    longitude  float32 10.0
    latitude   float32 10.0
  * time       (time) datetime64[ns] 2004-12-01 2004-12-01T03:00:00 ...
Data variables:
    mwd        (time) float64 dask.array<shape=(248,), chunksize=(248,)>
    Conventions:  CF-1.6
    history:      2017-08-09 16:22:56 GMT by grib_to_netcdf-2.4.0: grib_to_ne...


rabernat commented 6 years ago

No, I meant this:

ds = xr.open_mfdataset('path/to/ncs/*.nc', chunks={'time': 127})
ds_point = ds.sel(latitude=10, longitude=10)

Also, your comment says that "127 is normally the size of the time dimension in each file", but the info you posted indicates that it's 248. Can you also try open_mfsdataset without the chunks argument?

jamesstidard commented 6 years ago

That's true, maybe I misread last time or it's month dependant.

Hopefully this is what you're after - let me know if not. I used 3 *.nc files to make this, with the snippet you posted above.

Dimensions:    (time: 728)
    longitude  float32 10.0
    latitude   float32 10.0
  * time       (time) datetime64[ns] 1992-01-01 1992-01-01T03:00:00 ...
Data variables:
    mwp        (time) float64 dask.array<shape=(728,), chunksize=(127,)>
    Conventions:  CF-1.6
    history:      2017-08-10 04:58:48 GMT by grib_to_netcdf-2.4.0: grib_to_ne...

If you're after the entire dataset, I should be able to get that but may take some time.

rabernat commented 6 years ago

Can you just try your full example without the chunks argument and see if it works any better?

rabernat commented 6 years ago

This sounds similar to #1396, which I thought was resolved (but is still marked as open).

jamesstidard commented 6 years ago

Sure, I'm running that now. I'll reply once/if it finished. Though watching my system monitor memory usage, it does not appear to be growing. I seem to remember the open function continually allocating itself more ram until it was killed.

I'll take a read through that issue while I wait.

rabernat commented 6 years ago

The way this should work is that the selection of a single point should happen before the data is concatenated. It is up to dask to properly "fuse" these two operations. It seems like that is failing for some reason.

As a temporary workaround, you could preprocess the data to only select the specific point before concatenating.

def select_point(ds):
    return ds.sel(latitude=10, longitude=10)
ds = xr.open_mfdataset('*.nc', preprocesses=select_point)

But you shouldn't have to do this to get good performance here.

jhamman commented 6 years ago

@jamesstidard - it would be good to know a few more details here:

jamesstidard commented 6 years ago

That run was killed with the output

~/.pyenv/versions/3.4.6/lib/python3.4/site-packages/xarray/core/ FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`.
  if np.issubdtype(dtype, float):

Process finished with exit code 137 (interrupted by signal 9: SIGKILL)

I wasn't watching the machine at the time but I assume that's it falling over to memory pressure.

Hi @jhamman, I'm using 0.10.0 of xarray with dask 0.16.1 and distrobuted 1.18.0. I realise that last one is out of date, I will update and retry.

I'm just using whatever the default scheduler is as that's pretty much all the code I've got written above.

I'm unsure how to do a performance check as the dataset can't even be fully loaded currently. I've tried different chuck sizes in the past hoping to stumble on a magic size, but have been unsuccessful with that.

rabernat commented 6 years ago

Did you try my workaround?

rabernat commented 6 years ago

Also, maybe you can post this dataset somewhere online for us to play around with?

jhamman commented 6 years ago

@jamesstidard - let's see how the distributed scheduler plays:

from distributed import Client
client = Client()

ds = xr.open_mfdataset('path/to/ncs/*.nc', chunks={'latitude': 50, 'longitude': 50}) 
recs = ds.sel(latitude=10, longitude=10).to_dataframe().to_records()

Also, it would be worth updating distributed before you use its scheduler.

jamesstidard commented 6 years ago

I'll give both of those a shot.

For hosting, the files are currently on a local drive and they sum to about 1Tb. I can probably host a couple examples though.

Thanks again for the support.

rabernat commented 6 years ago

@jhamman, chunking in lat and lon should not be necessary here. My understanding is that dask/dask#2364 made sure that the indexing operation happens before the concat.

One possibility is that the files have HDF-level chunking / compression, as discussed in #1440. That could be screwing this up.

jhamman commented 6 years ago

@rabernat - good points.

@jamesstidard - perhaps you can a single files ncdump using the ncdump -h -s syntax. That should tell us how the file is chunked on disk.

jamesstidard commented 6 years ago

@jhamman Here's the ncdump of one of the resource files:

netcdf \34.128_1900_01_05_05 {
        longitude = 720 ;
        latitude = 361 ;
        time = UNLIMITED ; // (124 currently)
        float longitude(longitude) ;
                longitude:units = "degrees_east" ;
                longitude:long_name = "longitude" ;
        float latitude(latitude) ;
                latitude:units = "degrees_north" ;
                latitude:long_name = "latitude" ;
        int time(time) ;
                time:units = "hours since 1900-01-01 00:00:0.0" ;
                time:long_name = "time" ;
                time:calendar = "gregorian" ;
        short sst(time, latitude, longitude) ;
                sst:scale_factor = 0.000552094668668839 ;
                sst:add_offset = 285.983000319853 ;
                sst:_FillValue = -32767s ;
                sst:missing_value = -32767s ;
                sst:units = "K" ;
                sst:long_name = "Sea surface temperature" ;

// global attributes:
                :Conventions = "CF-1.6" ;
                :history = "2017-08-04 06:17:58 GMT by grib_to_netcdf-2.4.0: grib_to_netcdf /data/data05/scratch/_mars-atls09-95e2cf679cd58ee9b4db4dd119a05a8d-gF5gxN.grib -o /data/data04/scratch/ -utime" ;
                :_Format = "64-bit offset" ;

Unfortunately removing the chunks didn't seem to help. I'm running with the pre-process workaround this morning to see if that completes. Sorry for the late response on this - been pretty busy.

jamesstidard commented 6 years ago

@rabernat Still seem to get a SIGKILL 9 (exit code 137) when trying to run with that pre-processor as well.

Maybe my expectations of how it lazy loads files is too high. The machine I'm running on has 8GB or ram and the files in total are just under 1Tb

stale[bot] commented 4 years ago

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically