JiaweiZhuang / xESMF

Universal Regridder for Geospatial Data
http://xesmf.readthedocs.io/
MIT License
269 stars 49 forks source link

regridder multi threaded dask workers #88

Open NickMortimer opened 4 years ago

NickMortimer commented 4 years ago

I'm trying to interpolate a section from a curvilinear grid. I'm running an acoustic transmission loss model and need to extract sections from a model at differing bearings from a point. I've written my code as an xarray accessor.

    @dask.delayed   
    def _interp(self,ds,bearing,longitude,latitude,maxrange,rangestep,method):
            qlon,qlat,ranges,b=_make_line(longitude,latitude,bearing,maxrange,rangestep)
            ngrid =xr.Dataset(coords={'lat':qlat,'lon':qlon})
            regridder = xe.Regridder( ds, ngrid,method,filename=str(uuid.uuid4()).replace('-','_')+'.nc')
            sp = regridder(ds)
            sp = sp.isel(lat=xr.DataArray(range(0,len(ranges)), dims="range", coords={"range": ranges}),
                         lon=xr.DataArray(range(0,len(ranges)), dims="range")) 
            sp['soundspeed'] = _calcsoundspeed_pt(sp.salt,
                                            sp.temp,
                                            sp.zc)

            sp.zc.values =  abs(sp.zc)
            soundsl = sp.sortby(sp.zc)
            soundsl = soundsl.rename({'k':'depth'})
            soundsl = soundsl.set_index({'depth':'zc'})
            soundsl.coords['bearing'] = bearing
            soundsl.attrs['rangestep'] = rangestep
            soundsl.attrs['maxrange'] = maxrange
            soundsl.attrs['paramhead'] = self._obj.attrs['paramhead']
            soundsl.attrs['paramfile'] = self._obj.attrs['paramfile']
            soundsl.attrs['method'] = method
            soundsl = soundsl.rename({'lon':'longitude', 'lat':'latitude'})
            regridder.clean_weight_file()
            return soundsl

    def soundslice(self,longitude,latitude,bearing,maxrange,rangestep,method='bilinear'):
        ds = self._obj.rename({'longitude': 'lon', 'latitude': 'lat'})
        slices = [ self._interp(ds,bstep,longitude,latitude,maxrange,rangestep,method)for bstep in bearing]
        return xr.concat(dask.compute(*slices),dim='bearing')

If I have a dask cluster with 1 thread per worker all is good. If I have multiple threads then the code executes first time correctly then second or third time I get:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-8-adb3f6c0b2ed> in <module>
----> 1 sslice = model_run.sel(time=slice('2018-11-01T00:00:00','2018-11-02T00:00:00'))[['temp','salt','botz']].shoc.soundslice(114.5,-31.7,[90],10000,1000)

~/roampy/roampy/shoc.py in soundslice(self, longitude, latitude, bearing, maxrange, rangestep, method)
     60         ds = self._obj.rename({'longitude': 'lon', 'latitude': 'lat'})
     61         slices = [ self._interp(ds,bstep,longitude,latitude,maxrange,rangestep,method)for bstep in bearing]
---> 62         return xr.concat(dask.compute(*slices),dim='bearing')
     63 
     64 

~/miniconda3/envs/dev2/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1885             else:
   1886                 local_worker = None
-> 1887             return self.sync(
   1888                 self._gather,
   1889                 futures,

~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    777             return future
    778         else:
--> 779             return sync(
    780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )

~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

~/miniconda3/envs/dev2/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/miniconda3/envs/dev2/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

~/roampy/roampy/shoc.py in _interp()
     35             qlon,qlat,ranges,b=_make_line(longitude,latitude,bearing,maxrange,rangestep)
     36             ngrid =xr.Dataset(coords={'lat':qlat,'lon':qlon})
---> 37             regridder = xe.Regridder( ds, ngrid,method,filename=str(uuid.uuid4()).replace('-','_')+'.nc')
     38             sp = regridder(ds)
     39             sp = sp.isel(lat=xr.DataArray(range(0,len(ranges)), dims="range", coords={"range": ranges}),

~/miniconda3/envs/dev2/lib/python3.8/site-packages/xesmf/frontend.py in __init__()
    188             self._grid_in, shape_in = ds_to_ESMFlocstream(ds_in)
    189         else:
--> 190             self._grid_in, shape_in = ds_to_ESMFgrid(ds_in,
    191                                                      need_bounds=self.need_bounds,
    192                                                      periodic=periodic

~/miniconda3/envs/dev2/lib/python3.8/site-packages/xesmf/frontend.py in ds_to_ESMFgrid()
     62 
     63     # tranpose the arrays so they become Fortran-ordered
---> 64     grid = esmf_grid(lon.T, lat.T, periodic=periodic)
     65 
     66     if need_bounds:

~/miniconda3/envs/dev2/lib/python3.8/site-packages/xesmf/backend.py in esmf_grid()
     99     # However, they actually need to be set explicitly,
    100     # otherwise grid._coord_sys and grid._staggerloc will still be None.
--> 101     grid = ESMF.Grid(np.array(lon.shape), staggerloc=staggerloc,
    102                      coord_sys=ESMF.CoordSys.SPH_DEG,
    103                      num_peri_dims=num_peri_dims)

~/miniconda3/envs/dev2/lib/python3.8/site-packages/ESMF/util/decorators.py in new_func()
     62 
     63         esmp = esmpymanager.Manager(debug = False)
---> 64         return func(*args, **kwargs)
     65     return new_func
     66 

~/miniconda3/envs/dev2/lib/python3.8/site-packages/ESMF/api/grid.py in __init__()
    383             self._struct = ESMP_GridStruct()
    384             if self.num_peri_dims == 0:
--> 385                 self._struct = ESMP_GridCreateNoPeriDim(self.max_index,
    386                                                        coordSys=coord_sys,
    387                                                        coordTypeKind=coord_typekind)

~/miniconda3/envs/dev2/lib/python3.8/site-packages/ESMF/interface/cbindings.py in ESMP_GridCreateNoPeriDim()
    558     rc = lrc.value
    559     if rc != constants._ESMP_SUCCESS:
--> 560         raise ValueError('ESMC_GridCreateNoPeriDim() failed with rc = '+str(rc)+
    561                         '.    '+constants._errmsg)
    562 

ValueError: ESMC_GridCreateNoPeriDim() failed with rc = 545.    Please check the log files (named "*ESMF_LogFile").

I known this is probably not the usual use case for this package but it works nicely. Any thoughts on how to just interplate along a line would also be greatly received.

slevang commented 4 years ago

I've run into similar errors using xesmf in parallel applications, and it has usually been an issue with the regridder files being stored on disk. You might solve this by keeping the weight file and setting reuse_weights to True.

Any updates on the new API that makes use of in-memory storage for regridder weights?

kerriegeil commented 3 years ago

I'd be interested in the in-memory storage for regridder weights too, as I'm running into the 545 error as well. My use case is regridding ~50 CMIP climate model fields that exist on all different grids to a common grid. Was trying to use dask.delayed to set up the ~50 calls to a regridding function (that creates an xesmf regridder and regrids) and dask.compute to then do all the regridding but it's a no go.

I tested with:

def my_regrid_func( xarraydataset, dst_grid ) regridder=xe.regridder( xarraydataset, dst_grid, 'bilinear' ) var_regrid=regridder( xarraydataset[ 'var' ] ) outDS=xr.Dataset( { var : var_regrid } ) outDS.to_netcdf( path )

result1=dask.delayed( my_regrid_func )( xarraydataset1, dst_grid ) dask.compute( result1 )

result2=dask.delayed( my_regrid_func )( xarraydataset2, dst_grid ) dask.compute( result2 )

If there's a solution for embarrassingly parallel xesmf/dask regridding that I haven't found yet, please advise thanks

slevang commented 3 years ago

Check out the new pangeo-data fork of this package, which has a number of updates including in-memory weights storage.

dbarahon commented 3 years ago

Same problem here. I am using xarray to read very large files from model output. I created a custom generator to read one time step at a time, regrid and feed it to a NN model. First time around works fine, second time I get this error:

File "/discover/nobackup/dbarahon/bin/conda/envs/tf_keras/lib/python3.8/site-packages/ESMF/interface/cbindings.py", line 528, in ESMP_GridCreate1PeriDim raise ValueError('ESMC_GridCreate() failed with rc = '+str(rc)+'. '+

These are the relevant parts of the code:

def train_gen(dts_per_epoch): st = 0 epoch = 1 while True: if st < 1: X, y, blcks = get_dts(print_files = True) st += 1 b = np.random.randint(0, blcks-1, 1) # select a random block of size batch size
print ('_block', b, '_') xb = Xall.data.blocks[b] xb1 = xb.compute() xb2 = np.asarray(xb1)

print (xb2.shape)

          yb = yall.data.blocks[b]
          yb1 = yb.compute() 
          yb2 = np.asarray(yb1)
         # print (yb2.shape)       
          yield( xb2, yb2)
          xb2 = []
          yb2 = []
          print('st_',st)
          if st > dts_per_epoch: 
            st  = 0

def get_dts(): .... lat= dat_in['lat'].values lon = dat_in['lon'].values grid_out = xr.Dataset({'lat': (['lat'], lat ), 'lon': (['lon'], lon), })
regridder = xe.Regridder(Wc, grid_out , 'bilinear', filename='regrid.nc', periodic=True, reuse_weights=True) y = regridder(Wc)

...

wqshen commented 3 years ago

I've run into similar errors using xesmf in parallel applications, and it has usually been an issue with the regridder files being stored on disk. You might solve this by keeping the weight file and setting reuse_weights to True.

Set reuse_weights = True do not solve this problem completely , it also raises same error sometimes.

Also i install pangeo-data fork and this problem still exist.

This problem really bother me, cause both of dask.distributed cluster and xesmf are all effective tools i want use.