JiaweiZhuang / xESMF

Universal Regridder for Geospatial Data
http://xesmf.readthedocs.io/
MIT License
269 stars 49 forks source link

Spatial chunking for concurrent dask regridding #26

Closed bekozi closed 5 years ago

bekozi commented 5 years ago

@JiaweiZhuang, this notebook PR implements example spatial chunking with conversion to xarray. I thought it would be pretty easy for you to add xESMF code to this. I can give the xESMF code a shot if you don't have time.

There are a number of general features that should be added, but I'm not quite sure how to address them yet. I thought it best to let this evolve a little before jumping to conclusions (also new to dask in general). I am definitely not expecting to merge this right away. Some topics:

_Note: You'll need to use the ocgis master branch for this. I needed to add the chunk creation for a specific index and the "decodecf" option to xarray conversion. Conversion to datetime objects in xarray can be a bit slow, especially if it is not needed!

codecov-io commented 5 years ago

Codecov Report

Merging #26 into master will not change coverage. The diff coverage is n/a.

Impacted file tree graph

@@           Coverage Diff           @@
##           master      #26   +/-   ##
=======================================
  Coverage   95.19%   95.19%           
=======================================
  Files           6        6           
  Lines         229      229           
=======================================
  Hits          218      218           
  Misses         11       11

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 97ec30a...921e240. Read the comment docs.

JiaweiZhuang commented 5 years ago

That's fast work! Interesting that mpi4py is not used at all. So your plan is to only use dask for parallelization? -- would be very cool if possible!

Seems like the current code only does spatial chunking but not regridding yet? It would be necessary to test if this method scale with # of CPUs (on a single multi-core computer, not talking about distributed yet). I am concerning about calling complicated library functions in each dask worker. Do all ocgis functions (chunking, regridding...) release GIL?

Not sure where this example should go. Is the xESMF repo not good? I just made an examples directory for a placeholder...

Probably in OCGIS's repo & docs? Given that you are not using xarray here, this seems like a general OCGIS example. If dask really works it might become the standard way of parallelizing ESMPy/OCGIS. Not having to spawn MPI processes is a big plus for users.

The chunks are created and the data loaded via ocgis. It is then converted to xarray. This is not great for workflows that wish to start with xarray. I think an xarray-to-ocgis converter will address this (the grid chunker does not have to work with files).

I suspect that pure numpy arrays will suffice. Metadata can be easily fixed after all heavy computations are done.

If you only care about chunking coordinates (for parallel weight generation), not chunking input data, the workflow is simply:

I don't feel like the source/destination grid slices should be generated on the workers necessarily. The index approach feels a bit clunky (better for concurrency however). The spatial chunking can be pulled into the master task without too much trouble.

Again I am just concerning about scalability. Not sure how much heavy-lifting that GridChunker is doing under the hood. Then we need to call ESMPy regridding immediately after that, inside each dask worker.

The task does not return or fill anything. It will be good to add some distributed arrays to the example along the lines of the xESMF-dask example you already created. Eventually, I want to persist the regridded data in a netCDF format matching the destination grid's.

Yes in pure ESMPy/OCGIS approach you will apply ESMPy's black-box regridders to each "chunked array" (not a standard chunked dask array actually; each source "chunk" has overlap with each other). For xESMF I am in favor of native scipy/numba SMM functions (requires in-memory weight retrieval) as it generally incurs less overhead, particularly when working with dask...

I'll be interested to see how this works with threads. If it does and memory is not a constaint, we should see some pretty good xESMF performance gains.

Threads should be much faster than multi-processing if all OCGIS/ESMPy functions release GIL. Given that they've been relying on MPI, I guess GIL hasn't been a concern at all?

JiaweiZhuang commented 5 years ago

I am actually having trouble with running the code: NCPP/ocgis#489

bekozi commented 5 years ago

Thanks for the timely response. You gave me a number of things to consider.

That's fast work!

Ha, well, thankfully it was mostly ready to go. That being said, I was thinking to develop this example with xESMF so didn't test other regridding. Investigating threads a bit more, I learned ESMF must be built with ESMF_COMM='mpiuni', ESMF_PTHREAD='OFF', and potentially ESMF_OPENMP='OFF' for it to play nicely with the dask scheduler. So yeah, should have been slower. I'm honestly a little surprised at these issues and solving them will mean digging deep into dask, probably. The ocgis/xarray code works with no problems.

Interesting that mpi4py is not used at all. So your plan is to only use dask for parallelization? -- would be very cool if possible!

Yes, the plan is to use dask for situations where the regridding operation requires out-of-core memory. Dask will also be useful for testing with pangeo to see about elastic scaling (i.e. run the operation on four VMs concurrently as opposed to four times on one VM). It would nice to get the hybrid dask/MPI working on independent VMs. That comes later though!

I'm thinking this may have to go the route of one process per VM as opposed to multiple processes/threads per VM. At least for now. Dask concurrency is causing some fun, undefined memory issues when running both under "processes" and "threads" contexts. The "synchronous" scheduler seems okay. The "processes" issues really surprise me as I expected process isolation, and I think there should be a way to address this.

Seems like the current code only does spatial chunking but not regridding yet?

I was thinking you might want to try xESMF as the regridder. I'll need to get appropriate builds for dask-friendly ESMF on conda unless you want to build your own without threading support.

It would be necessary to test if this method scale with # of CPUs (on a single multi-core computer, not talking about distributed yet)? I am concerning about calling complicated library functions in each dask worker. Do all ocgis functions (chunking, regridding...) release GIL?

You're right that threads may be a total non-starter. I really didn't think that through.

Probably in OCGIS's repo & docs? Given that you are not using xarray here, this seems like a general OCGIS example. If dask really works it might become the standard way of parallelizing ESMPy/OCGIS. Not having to spawn MPI processes is a big plus for users.

I do already have it in the ocgis repo. This code was designed to be part of the dask/xESMF workflow to avoid computing weights on the main process only. Maybe I should move this PR over to that repo?

Both esmpy/ocgis use MPI pretty densely at points, but the goal will be to use dask where possible in ocgis at least. The async MPI is the hardest to deal with... Esmpy dask-parallelization will probably require something like this for the foreseeable future. Hopefully the dask-related memory issues can be resolved.

I suspect that pure numpy arrays will suffice. Metadata can be easily fixed after all heavy computations are done.

Sure. The xarray (or ocgis) conversion is optional if pure numpy is fine.

If you only care about chunking coordinates (for parallel weight generation), not chunking input data, the workflow is simply:

Makes sense. What I have now is how I would use the workflow, but it is good to think about how it may be adapted to other scenarios. Are you able to work with xESMF in the file-based context?

Eventually, I imagine chunking along all dimensions would be desirable since long time-series are often the primary bottleneck.

Again I am just concerning about scalability. Not sure how much heavy-lifting that GridChunker is doing under the hood. Then we need to call ESMPy regridding immediately after that, inside each dask worker.

I think we are on the same page here. Unless we are working with certain unstructured grids, the GridChunker is doing comparatively little - especially with well-behaved rectilinear grids.

Reassembling a weight file is tricky since offsets must be tracked for each subset to ensure the local-global indices are equivalent. This would need to be adapted to the dask workflow. You are probably interested in creating global weight files.

Yes in pure ESMPy/OCGIS approach you will apply ESMPy's black-box regridders to each "chunked array" (not a standard chunked dask array actually; each source "chunk" has overlap with each other).

If only the regridding needs this overlap, it probably doesn't make sense to worry about the source data and chunk only the destination since that's what we care about. Again, not so much a concern with the file-based approach. File locks would be that way to go for reassembling the global file to avoid shipping data around. This all hinges on each worker having access to the same storage of course...

For xESMF I am in favor of native scipy/numba SMM functions (requires in-memory weight retrieval) as it generally incurs less overhead, particularly when working with dask...

Yep, that's fine.

Threads should be much faster than multi-processing if all OCGIS/ESMPy functions release GIL. Given that they've been relying on MPI, I guess GIL hasn't been a concern at all?

You got it. I've worked within the GIL as it relates to Python threading, but not in the context of a dask scheduler.

JiaweiZhuang commented 5 years ago

Let me re-organize the topics because the discussion is getting convoluted...

1. Using dask to parallelize chunked weight generation

My understanding of the current status is that:

Regarding the two comments:

I'm thinking this may have to go the route of one process per VM as opposed to multiple processes/threads per VM.

It would nice to get the hybrid dask/MPI working on independent VMs.

Given that weight generation is very compute-intensive, it seems quite wasteful to just use one single core per VM. For ~10 km global grid, I/O is not an issue at all (only ~100 MB for the coordinate values), while computing weights can take many minutes on a single core. For ~1 km global grid, I/O and memory become a pain (~10 GB for the coordinate values), but computing weights is an even bigger pain... Here you need both out-of-core and parallelism, otherwise the process will be compute-bounded.

I think we must pick one way:

The hybrid one feels very tricky so you might try if pure dask can work...

2. Reading chunks from files versus in-memory split

I see that GridChunker reads chunks from files, from Chunked Regrid Weight Generation example.

Regarding the two comments:

Are you able to work with xESMF in the file-based context?

(the grid chunker does not have to work with files).

Do you mean that GridChunker can be made to work with in-memory arrays with additional effort, but by default has to read from files?

I absolutely want to work with in-memory numpy arrays. Reasons include:

It would be the best if everything can be done in memory, including chunking the grid coordinates, in-memory retrieval (#11), and reassembling chunked weights. I am worried that ocli seems to be designed to work with netcdf files.

3. How to plug the regridding step into the current chunking framework

In response to the comment

I was thinking you might want to try xESMF as the regridder.

I probably shouldn't call high-level xESMF functions inside apply_by_spatial_chunk in your example. Each worker will call ESMPy to write weight files to disk and read them back. I don't expect performance gain because of the above two issues (parallel efficiency and unnecessary I/O) 🤔

How would you deal with the chunk src_grid, in a traditional OCGIS/MPI approach (so, do not call src_grid.parent.to_xarray())? From the OCGIS regridding example, regridding is done by calling ocgis.OcgOperations on ocgis.Field object. Can it write out weights similar to ESMPy's ESMF.Regrid(filename='...')? If #11 is solved, will a similar option be available in ocgis's API?

bekozi commented 5 years ago

Let me re-organize the topics because the discussion is getting convoluted...

:+1: Good call. The discussion was starting to need a TOC. I added a section 4 at the end for next steps.

1. Using dask to parallelize chunked weight generation

I agree that threads should be a long-term goal if it is even worth pursuing. We should be able to use processes within the spatial chunking context, but, like you say, it will take some debugging. If memory is not a concern, routine deep copying may help secure memory.

Given that weight generation is very compute-intensive, it seems quite wasteful to just use one single core per VM...

It is definitely wasteful depending on the size of the VMs! A swarm of small VMs could potentially be used but it all comes down to performance tradeoffs within workflows.

Use pure dask for parallelization and gain scalability on a single machine, and then naturally go distributed

This makes sense to me and is the goal of the spatial chunking. We'll need to find process-localization, and I'll need to debug why ESMF initialization does not like the dask process scheduler. Hybrid MPI/dask would be super cool, but that's future stuff.

2. Reading chunks from files versus in-memory split

Do you mean that GridChunker can be made to work with in-memory arrays with additional effort, but by default has to read from files?

The command line interface works only for NetCDF files. The GridChunker object can work with in-memory arrays. This is the object to use for spatial chunk creation. I don't think it makes sense to use the CLI within this dask example. However, one could imagine cheating by executing an MPI-enabled executable through "mpirun" on dask workers.

Your reasons for using in-memory arrays make sense, and I also think this is where the example should go. Regarding zarr, an ocgis driver could be added, but I don't think the effort is warranted yet and xarray is plenty sufficient!

It would be the best if everything can be done in memory

Sure. It will be good to minimize data transfers but since we're working with a naive example we can shelve this concern for the moment.

in-memory retrieval (#11)

Yes, we are honestly nearly there and will get this out as soon as we can. Our internal three week deadline has passed I know...

3. How to plug the regridding step into the current chunking framework

I probably shouldn't call high-level xESMF functions inside apply_by_spatial_chunk in your example. Each worker will call ESMPy to write weight files to disk and read them back. I don't expect performance gain because of the above two issues (parallel efficiency and in-memory array)

This will turn into a nuanced discussion that we should have at another time. Once ESMPy in-memory weight retrieval is in, I think performance will be boosted for certain grids and cluster configurations. This is all situation-dependent of course. I agree with you given the current state of this example!

How would you deal with the chunk src_grid, in a traditional OCGIS/MPI approach (so, do not call src_grid.parent.to_xarray())?

Xarray is not used at all in the "traditional approach". In the case where the global regridding operation can fit in machine memory, one just calls an ocgis script using mpirun. For the chunked case, the ocli CLI is used which orchestrates creating file chunks and doing all the regridding, etc. This may also run in parallel and is not bad IO-wise provided netcdf4-python parallel writes are used. You can imagine the on-disk tradeoffs going on in this workflow.

From the OCGIS regridding example, regridding is done by calling ocgis.OcgOperations on ocgis.Field object. Can it write out weights similar to ESMPy's ESMF.Regrid(filename=...)?

The RegridOperation handles all the ESMPy interaction and can write weights, etc. Some of the more advanced ESMPy features are in a branch related to https://github.com/NCPP/ocgis/issues/488 which should be in the ocgis trunk soon.

If #11 is solved, will a similar option be available in ocgis's API?

I suppose we would bring it in, but I hadn't really thought about. The weights-to-file or direct ESMF SMM has been sufficient.

In sum, the file-based MPI regridding approach solves the big grid problem using "old school" tools. It works well enough, but I consider it a static solution. I believe xESMF is the right front-end for dask integration which will offer a dynamic treatment of big grids. Ocgis could be used in this example I suppose, but it doesn't offer threaded SMM for example. I think ocgis can offer a number of useful features moving forward.

4. Next steps

Here are some potential next steps. What do you think?

JiaweiZhuang commented 5 years ago

I agree with most of your comments -- glad that we've reached a consensus😃

Also agree with the next steps. Let me order them in a progressive way, so we can get some obvious outcome at each stage (instead of fighting with ESMF+dask for months!).

Stage 1. In-memory weight retrieval

I want to solve #11 before making other major updates to xESMF. Serial computation is totally fine at this stage.

Then I would hope any later parallel-computing enhancement will be based on this in-memory approach instead of file I/O approach. It can indeed cause some pain at initial development, but is good for the long-term. Pure numpy arrays are a lot easier to handle than "NetCDF files with a specific format requirement" (for both users and developers).

Outcome:

Stage 2. Parallelized weight generation with ESMF + dask

Outcome:

~1 km grids are still rare right now, so I care more about computational speed-up, not memory constraint, at this stage. This stage does not involve weight application, which might still be done in an embarrassingly parallel way if the weights aren't too large. Working towards ~1 km grid is a future direction, but I want to make sure that effort we put in can show immediate benefit for the majority of users who are dealing with 10 ~100 km grid. Bringing down the weight computation time from several minutes (common for ~10 km grid) to tens of seconds is a quite decent achievement.

Stage 3. Spatial-chunked regridding (weight application)

Outcome: Out-of-core regridding for ultra-high-horizontal-resolution data

This is only for ~1 km grid over a large region that even a single layer of input data and the regridding weights themselves cannot comfortably fit into memory. Merely out-of-core computation on a single machine (needs to swap both data and weights!) will be deadly slow, so this approach should be used with dask-distributed where each chunk of weights lives on a different worker. I bet that pulling the correct input data chunk to each worker will cause lots of fun and trouble. This stage is too early to discuss, anyway.

4. OCGIS <-> xarray conversion

ocgis.from_xarray() / xarray.Dataset.to_ogcis() would be good to have for both packages, especially for unstructured grids that need additional data structure (#18). For xESMF I am still sticking to structured grids so simple 2D numpy arrays will suffice. So I don't have obvious personal needs for this.

Disclaimer: This outline is based on my understanding of general scientific users' need. If you need to process ~1 km data you may indeed want to prioritize Stage 3 🤔

bekozi commented 5 years ago

This is a great summary! Thanks. I agree that time could be spent in the wrong places easily. The ESMF/dask compatibility could be rabbit trail indeed.

Your prioritization makes sense. I know #11 will be critical for you to move forward.

So, I think we should make tickets for the orphan topics and probably close this PR (can open later if the code is relevant). Did you want me to take a crack at that (I'll mostly just use your text anyway)?

Bringing down the weight computation time from several minutes (common for ~10 km grid) to tens of seconds is a quite decent achievement.

Parallel weight generation on the master worker could help here if you were looking for something more immediate... I think we had talked about this.

JiaweiZhuang commented 5 years ago

So, I think we should make tickets for the orphan topics and probably close this PR (can open later if the code is relevant). Did you want me to take a crack at that (I'll mostly just use your text anyway)?

Sure, please do so!

Parallel weight generation on the master worker could help here if you were looking for something more immediate... I think we had talked about this.

Do you mean MPI? I'd like avoid adding MPI as a dependency, if there's a pure dask solution (I feel that most Earth scientists don't know MPI programming). If dask+ESMF cannot work then I will fall back to the old MPI solution.