JiaweiZhuang / xESMF

Universal Regridder for Geospatial Data
http://xesmf.readthedocs.io/
MIT License
269 stars 49 forks source link

Parallel regridding support #3

Closed JiaweiZhuang closed 4 years ago

JiaweiZhuang commented 6 years ago

"Parallel regridding" could mean two things: (see #2 for more about this two-step regridding)

  1. Generate regridding weights in parallel.
  2. Apply regridding weights in parallel.

The native parallel support in ESMF/ESMPy is based on MPI and horizontal domain decomposition. It works for both generating and applying weights. See https://github.com/nawendt/esmpy-tutorial/blob/master/esmpy_mpi_example.py as an example.

MPI-based horizontal domain decomposition makes perfect sense for earth system model simulation, but for data analysis I would absolutely want to avoid MPI's complexity. With dask, there's a simple way to go:

  1. Avoid horizontal domain decomposition and only parallelize over additional dimensions like time and level. Writing such code withdask.array will be trivial.
  2. Still generate regridding weights in serial. My impression is people tend to have a fixed pair of source and destination grids and regrid a lot of data fields between them. In this case, weights generation only needs to be done once. So we would only care about applying the weights in parallel.

Is there any case that we have to parallelize over horizontal dimensions?

PS: Need to profile the regridding on very large data sets and figure out the bottleneck (generating vs applying weights) before starting to implement anything.

bekozi commented 6 years ago

I generally agree with your summary. I added a few comments. It would also be good to get @rokuingh's thoughts.

Still generate regridding weights in serial. My impression is people tend to have a fixed pair of source and destination grids and regrid a lot of data fields between them. In this case, weights generation only needs to be done once.

Yes, mostly. The only caveat is grid size. Very large grids can be prohibitive for serial operations. Dealing with big data is the responsibility of the user (provided the software works in the first place), but excluding the ability to generate weights in parallel using horizontal decompositions would remove important functionality. Especially since grid resolutions are for the most part increasing.

So we would only care about applying the weights in parallel. Is there any case that we have to parallelize over horizontal dimensions?

Again, large grids. There would also be the case of time-varying grid/mesh spatial structures. That is a very special case though! There is also the parallelization scheme used by ESMF's weight application (see below).

PS: Need to profile the regridding on very large data sets and figure out the bottleneck (generating vs applying weights) before starting to implement anything.

Generating the weights and creating the ESMF route handle are the major bottlenecks. Which takes longer depends on the regridding method and source/destination structures. The route handle contains the sparse matrix application optimizations. Hence, once the route handle is created, the weight application is very fast. Note SMM optimizations must use horizontal decompositions (correct me if I'm wrong @rokuingh). It is entirely possible to avoid ESMF's SMM routines and write your own that interprets a weight file. It is worth noting that ESMPy does not expose Python bindings for route handle stuff yet. @rokuingh is very close to to-weight-file/from-weight-file operations...

For large grids, memory usage is also an issue. We've developed a workflow that uses spatial subsetting (to ensure appropriate source/destination overlap) and ESMF to tile weight and SMM operations. We'd like to bring this into ESMF proper at some point.

Anyway, you definitely have a good handle on things. Let us know if you have any questions.

rokuingh commented 6 years ago

I generally agree with @bekozi.

ESMPy does use horizontal decompositions, but we have considered bringing the customizealld decomposition capability of ESMF out to the Python layer. However, this still would not allow the case mentioned by @JiaweiZhuang of distributing over non-gridded dimensions like time and levels, unless the grid was created with enough dimensions to incorporate the non-gridded dimensions, but that would probably turn into a big mess..

The ability to write weights to file and read weight from file in ESMPy is almost finished. I requested time for a development sprint on this from my supervisor yesterday to finish it off, will let you know how that goes.

@bekozi has done a fair amount of profiling of the ESMF SMM code, is that stuff publicly available?

JiaweiZhuang commented 6 years ago

The ability to write weights to file and read weight from file in ESMPy is almost finished. I requested time for a development sprint on this from my supervisor yesterday to finish it off, will let you know how that goes.

Great. That's the currently most important part for me.

JiaweiZhuang commented 6 years ago

Dealing with big data is the responsibility of the user (provided the software works in the first place), but excluding the ability to generate weights in parallel using horizontal decompositions would remove important functionality. Especially since grid resolutions are for the most part increasing.

You are right, but it also depends on actual use cases. For me, the highest priority for xESMF is usability. I want users to be able to perform regridding with one line of code, and I don't specifically target on very large grids. If there's an easy&elegant way to gain speed-up (say dask) I will be happy to go, but I don't want users to call mpirun -- writing and understanding MPI codes is beyond an average earth scientist's skill...

Also, I want xESMF to provide regridding capability for my cubedsphere package and the models it serves (FV3, GEOS-Chem...). The horizontal resolution should be around 50km and there are always many vertical levels and variables, so parallelizing over extra dimensions with dask seems a reasonable way.

Would like to hear from @rabernat @jhamman for your use cases, as you are particularly interested in regridding.

jhamman commented 6 years ago

@JiaweiZhuang - first let me say that the package you've put together looks pretty sweet.

The conundrum you're discussing here is why I stayed away from ESMF (and other existing low-level remapping libraries). I basically could not see a clear path toward integration with the xarray ecosystem (dask). As I'm thinking about it more, I have one idea for how you could make this work using a hybrid approach.

The transform step is fairly straightforward (new = (weights * data ) / sum(weights)) and is generalizable once you have the weights. I would like to have something that exposes this sort of API as it would lend itself to the addition of new methods to calculate mappings/weights outside of ESMF.


As for your actual question, my most common use case is remapping 3d and 4d data between well defined grids (regular, equal area, etc.) using a either conservative or linear remapping schemes. So, in theory, ESMF includes everything I need.

rabernat commented 6 years ago

weights * data is a matrix multiplication, right? And weights is a relatively sparse matrix? If so, perhaps it could be executed using dask's sparse array capabilities (via sparse package).

JiaweiZhuang commented 6 years ago

@jhamman Thanks!

I would like to have something that exposes this sort of API as it would lend itself to the addition of new methods to calculate mappings/weights outside of ESMF.

I totally agree with this kind of interoperability. Current popular remapping packages are ESMF and Tempest. They both output regridding weights in SCRIP format (SCRIP itself was retired). I tried to write a Python-interface for Tempest (written C++) but then realized that ESMPy allows me to do everything in Python... I think the representations of regridding weights are pretty consistent among packages.

As for your actual question, my most common use case is remapping 3d and 4d data between well defined grids (regular, equal area, etc.) using a either conservative or linear remapping schemes.

Glad to see that! Just to point out that the first-order conservative scheme is also a linear mapping. A scheme is linear as long as the weight matrix is independent to the data field, which means we can pre-calculate the weights and apply them to any data. The only non-linear remapping I know is high-order conservative remapping with monotonic constraints, used in semi-Lagrangian advection schemes.

JiaweiZhuang commented 6 years ago

@rabernat

weights * data is a matrix multiplication, right? And weights is a relatively sparse matrix? If so, perhaps it could be executed using dask's sparse array capabilities (via sparse package).

Yes it is very sparse(see #2). The full dense matrix should have the size of _N_s*Nd, where _Ns and _Nd are the numbers of grid points in source and destination grids. But the number of non-zero elements should be at the order of _max(N_s, Nd).

Good to know that dask already has sparse matrix support. It seems that calling sparse.tensordot on dask arrays means executing scipy.sprarse.csr_matrix.dot in parallel... I would try it out.

jhamman commented 6 years ago

cc @mrocklin and @kmpaul

mrocklin commented 6 years ago

My first impression here is that creating a large sparse matrix may not be the right approach here. Instead, we might consider how each block in the input array affects each block in the output array, sum up those effects, and build up a graph of tasks manually. My guess is that this requires a non-trivial amount of blackboard/notepad time to get right.

mrocklin commented 6 years ago

@pelson has also asked about this topic before.

mrocklin commented 6 years ago

If my intuition above is correct then my first instinct would be to consider the action of regridding on a small 10x10 array cut into 5x5 chunks, and work that out by hand.

JiaweiZhuang commented 6 years ago

@mrocklin Thanks for the suggestion! This block-by-block approach looks more like ESMF/ESMPy's original MPI implementation. It is very hard to replicate that in Python so I'll first try a simpler approach.

pelson commented 6 years ago

@cpelley has a huge amount of experience with chunking regridding operations (esp. huge input to moderate output)

mrocklin commented 6 years ago

@mrocklin Thanks for the suggestion! This block-by-block approach looks more like ESMF/ESMPy's original MPI implementation. It is very hard to replicate that in Python so I'll first try a simpler approach.

I'm curious, what makes this hard in Python?

cpelley commented 6 years ago

@cpelley has a huge amount of experience with chunking regridding operations (esp. huge input to moderate output)

I have developed a generalised decomposition framework for the project ANTS (a toolkit for generating ancillaries for the unified model here at the Met Office).

This works by fetching 'pieces' of the target and corresponding overlapping source 'pieces' and sending them along with an operation (whether regrid or otherwise) to any number of processes. Our method abstracts away the decomposition technology utilised and in doing so, allows users to gain the benefits of ipython parallel for multi-machine parallelism, simple single machine parallelism from multiprocessing or just serial decomposition without any coding change to their code and with a simple one-line API.

More information can be found here: https://code.metoffice.gov.uk/doc/ancil/ants/latest/decomposition.html

The reasons for choosing horizontal decomposition are that performing the overlap calculations is incredibly fast and broadcasting over the other dimensions is incredibly fast. So, doing so provides the benefit of hardware utilisation along with control of memory usage while maintaining the speed benefits of numpy broadcasting.

I hope to spend some time looking at the possibility of Dask usage for us in future. A few years ago I deemed it wouldn't help us due to the complex runtime relationship between source and target in the decomposition. Dask seems to have moved along quite a bit now so I suspect the situation could easily have changed...

Works well for us and our users and is a Python based framework, made possible by utilising powerful libraries like iris, cartopy, numpy, shapely etc.

mrocklin commented 6 years ago

The link you provide seems to require a Met Office login. Do you have anything that is publicly available?

cpelley commented 6 years ago

Ah OK, here you go: ants_decomposition.pdf

Hope this is useful in some way.

Cheers

mrocklin commented 6 years ago

Is this open source? If so do you have a link to source code?

On Thu, Nov 2, 2017 at 8:44 AM, Carwyn Pelley notifications@github.com wrote:

Ah OK, here you go: ants_decomposition.pdf https://github.com/JiaweiZhuang/xESMF/files/1437724/ants_decomposition.pdf

Hope this is useful in some way.

Cheers

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/JiaweiZhuang/xESMF/issues/3#issuecomment-341409671, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszGMrZ6Ktq2QYf5eNM_It4e3KVcT5ks5syblDgaJpZM4PIAzP .

cpelley commented 6 years ago

I'm afraid not :(

JiaweiZhuang commented 6 years ago

I'm curious, what makes this hard in Python?

Just realized that it shouldn't be that hard😅 I was thinking about domain decomposition and MPI communication. But here since the weight matrix is already generated, the problem is only about parallelizing a sparse matrix multiplication (with broadcasting). See #6 for the serial implementation.

The choice is either chunking over extra/broadcasting dimensions (time and level) or chunking over the horizontal field. A typical sparse matrix structure is shown below. By chunking over the "output field" dimension we should be able to avoid writing to the same output grid point at the same time. weights

mrocklin commented 6 years ago

Checking in here. Does the latest release manage parallelism across non-grid dimensions like time and level?

mrocklin commented 6 years ago

Assuming the answer above is "yes, this works for simple cases on dask arrays" (which may not be true) then have you tried this out on a distributed system? One might try this by setting up a single-machine dask.distributed cluster with the following lines of Python

from dask.distributed import Client
client = Client()  # starts a few single-threaded worker processes

# normal xarray with dask code
JiaweiZhuang commented 6 years ago

Checking in here. Does the latest release manage parallelism across non-grid dimensions like time and level?

The current version (v0.1.1) only runs in serial, although extra-dimension-parallelism should be quite easy to add (literally just a parallel sparse .dot()) May add an experimental parallel method in the next release.

JiaweiZhuang commented 6 years ago

Assuming the answer above is "yes, this works for simple cases on dask arrays" (which may not be true) then have you tried this out on a distributed system? One might try this by setting up a single-machine dask.distributed cluster with the following lines of Python

Thanks for the suggestion. I haven't needed to regrid any data that is large enough and requires a distributed cluster. If there's any specific research needs I am willing to try.

mrocklin commented 6 years ago

The current version (v0.1.1) only runs in serial, although extra-dimension-parallelism should be quite easy to add (literally just a parallel sparse .dot()) May add an experimental parallel method in the next release.

I think that XArray will handle things for you if you use methods like apply_ufunc

Thanks for the suggestion. I haven't needed to regrid any data that is large enough and requires a distributed cluster. If there's any specific research needs I am willing to try.

We're certainly dealing with multi-terabyte datasets for which distributed computation isn't necessary, but is certainly convenient. I'll be giving a couple of talks about distributed XArray for data analysis in the next couple of weeks. It would be nice to point to this work as an example of an active community developing functionality with serial computation in mind that "just works" without having to think much about distributed computation. I believe that XArray's apply_ufunc was designed with this kind of thing in mind.

JiaweiZhuang commented 6 years ago

@mrocklin

I tested parallel regridding with Numba and got excellent performance. See this notebook (it is all about sparse dot and doesn't use ESMF/ESMPy at all). The entire workflow is now more bounded by IO then by regridding calculations.

mrocklin commented 6 years ago

We might also want to scale these computations on large datasets that don't fit in memory. Apply_ufunc is useful in the distributed or out-of-core case.

On Sun, Jan 14, 2018 at 2:11 PM, Jiawei Zhuang notifications@github.com wrote:

@mrocklin https://github.com/mrocklin

I tested parallel regridding with Numba and got excellent performance. See this notebook https://github.com/JiaweiZhuang/sparse_dot/blob/master/sparse_dot_benchmark.ipynb (it is all about sparse dot and doesn't use ESMF/ESMPy at all). The entire workflow is now more bounded by IO then by regridding calculations.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/JiaweiZhuang/xESMF/issues/3#issuecomment-357534115, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszGkSv2HfGtCUERNpeyFoiOlfqV_Zks5tKlFQgaJpZM4PIAzP .

jhamman commented 6 years ago

@JiaweiZhuang - have you tried using dask's dot product? If that can be made to work, we could use dask with xarray right out of the box to parallelize this step.

rabernat commented 6 years ago

I agree with Matt that we are talking about two different types of optimization here:

  1. A single regridding operation from one grid to another (given a sparse weight matrix). This is a highly "solved" problem, in the sense that it is a very common scientific computing task, with several high-performance implementations (scipy, numba, maybe also GPU?). At this stage, I am assuming that a single regridding operation fits in memory.

  2. Repeating this task many times over, say for each timestep or vertical level of a dataset. It is very unlikely that this can all fit in memory at once. But it is embarrassingly parallel. dask and xarray.apply_ufunc could be very helpful here.

rabernat commented 6 years ago

So far @JiaweiZhuang has been focused on optimizing 1. Those of us with more xarray and dask experience could help with 2.

JiaweiZhuang commented 6 years ago

@JiaweiZhuang - have you tried using dask's dot product? If that can be made to work, we could use dask with xarray right out of the box to parallelize this step.

Tried at the very beginning. dask.array.dot only works with dense matrices, so cannot be used here.

JiaweiZhuang commented 6 years ago

@mrocklin @rabernat @jhamman

I agree with Matt and Ryan's comments on out-of-core computing with dask and apply_ufunc. I've just done some experiments with xarray.apply_ufunc, but could not make it work. I would appreciate it if any of you can take a look at my code.

See this notebook in a standalone repo: apply_ufunc_with_dask.ipynb

I've simplified the problem so it is only about sparse matrix multiplication. The repo already contains the regridding weight file, so you don't need to use xESMF and ESMPy at all.

It is helpful to take a look at sparse_dot_benchmark.ipynb first. It contains detailed explanations on the sparse matrix multiplication algorithm and a successful parallel implementation with Numba. It can be used to benchmark the dask method.

JiaweiZhuang commented 6 years ago

Repeating this task many times over, say for each timestep or vertical level of a dataset. It is very unlikely that this can all fit in memory at once. But it is embarrassingly parallel. dask and xarray.apply_ufunc could be very helpful here.

And I guess it can also parallelize over multiple variables in a Dataset?

mrocklin commented 6 years ago

Tried at the very beginning. dask.array.dot only works with dense matrices, so cannot be used here.

Yeah, I agree that you don't necessarily want to invoke dask array operations here. It sounds like you have built a good numpy -> numpy function. I believe that apply_ufunc will help apply that function in parallel across many slices of many dataarrays of a distributed dataset.

shoyer commented 6 years ago

@JiaweiZhuang Try using output_core_dims and output_sizes in your last example, e.g., output_core_dims=[['out_grid']] and output_sizes={'out_grid': dr_dask.sizes['grid_dims'] // 2}

JiaweiZhuang commented 6 years ago

Try using output_core_dims and output_sizes in your last example, e.g., output_core_dims=[['out_grid']] and output_sizes={'out_grid': dr_dask.sizes['grid_dims'] // 2}

Thanks! Adding output_core_dims and output_sizes fixed the bug. It is faster than serial+dask but still slower than serial+numpy. I also tried manually parallelizing over chunks using dask.delayed, but the efficiency is not comparable with Numba's parallelism. See the same notebook. Seems like I should use Numba for in-memory parallelization, and apply_ufunc+dask only for out-of-core.

mrocklin commented 6 years ago

Yeah, to be clear, Dask-done-well is almost never faster than Numba-done-well. In the context of XArray the main advantage of dask is larger-than-memory computing.

mrocklin commented 6 years ago

You might want to try profiling, perhaps with %prun instead of %time or %snakeviz after you pip install snakeviz and %load_ext snakeviz to see what is taking time in the sequential case (profiling in the threaded case will be less informative).

JiaweiZhuang commented 6 years ago

You might want to try profiling, perhaps with %prun instead of %time or %snakeviz after you pip install snakeviz and %load_ext snakeviz to see what is taking time in the sequential case (profiling in the threaded case will be less informative).

concatenate takes a significant amount of time, for either apply_ufunc or the low-level implementation with dask.delayed. With Numba I can preallocate contiguous memory and don't need to concatenate chunks.

mrocklin commented 6 years ago

You might want to try timing with persist rather than compute. This will skip the concatenation, leaving the results as a dask array (but now with fully computed entries rather than lazy ones). This more accurately reflects how people are likely to use this in a distributed setting or as part of a larger workflow anyway.

shoyer commented 6 years ago

I would try using the Numba dot product (without Numba's parallelization) inside dask/xarray.apply_ufunc. You might even try Numba's guvectorize to build a gufunc directly.

Something seems to be going wrong with your benchmarks using dask. Maybe dot products with scipy's coo_matrix don't release the GIL?

JiaweiZhuang commented 6 years ago

You might want to try timing with persist rather than compute. This will skip the concatenation, leaving the results as a dask array (but now with fully computed entries rather than lazy ones). This more accurately reflects how people are likely to use this in a distributed setting or as part of a larger workflow anyway.

Using persist does remove the concatenation overhead, but the serial performance is still worse than the pure numpy version... But since we care more about out-of-core rather than parallelization in this case, this should be fine.

I would try using the Numba dot product (without Numba's parallelization) inside dask/xarray.apply_ufunc. You might even try Numba's guvectorize to build a gufunc directly.

Something seems to be going wrong with your benchmarks using dask. Maybe dot products with scipy's coo_matrix don't release the GIL?

guvectorize gives slightly slower performance than prange. But maybe I was not using the optimal setup.

Numba on dask array shows somewhat similar performance to Scipy on dask array. There is some speed-up but the parallel efficiency is not great.

All details are in this new notebook: numba_on_dask.ipynb

NicWayand commented 6 years ago

Just chiming in to support the push for xesmf handling out-of-memory! Getting MemoryError while regridding some GFDL model output on a local computer with 32GB of ram. Would be great to avoid looping over files. Thanks @JiaweiZhuang for a great package, its been very useful so far.

JiaweiZhuang commented 6 years ago

Just chiming in to support the push for xesmf handling out-of-memory!

Glad that xESMF helps! I will utilize xarray.apply_ufunc in v0.2 to enable out-of-core regridding of large data files.

zxdawn commented 5 years ago

How about multiprocessing? I've tried multiprocessing to regrid, but it's as same as before.

Here's the example.

I'm not familiar with multiprocessing, maybe something wrong with it?

JiaweiZhuang commented 4 years ago

v0.2 now supports parallel regridding with dask.

Distributed regridding is left to pangeo-data/pangeo#334