JiaweiZhuang / xESMF

Universal Regridder for Geospatial Data
http://xesmf.readthedocs.io/
MIT License
269 stars 49 forks source link

Support parallel regridding with dask-distributed #71

Open jhamman opened 4 years ago

jhamman commented 4 years ago

Previous issues have discussed supporting dask enabled parallel regridding (e.g. #3). This seems to be working for the threaded scheduler but not for the distributed scheduler. It seems like this should be doable at this point with some work to solve some serialization problems.

Current behavior

If you run the current dask regridding example in this repo's binder setup with dask-distributed, you get a bunch of serialization errors:

[21] result = ds_out['air'].compute()  # actually applies regridding
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')
distributed.comm.utils - ERROR - ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 29, in _to_frames
    msg, serializers=serializers, on_error=on_error, context=context
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/batched.py", line 93, in _background_send
    payload, serializers=self.serializers, on_error="raise"
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 227, in write
    context={"sender": self._local_addr, "recipient": self._peer_addr},
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 37, in to_frames
    res = yield offload(_to_frames)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py", line 1370, in offload
    return (yield _offload_executor.submit(fn, *args, **kwargs))
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/srv/conda/envs/notebook/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/srv/conda/envs/notebook/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/utils.py", line 29, in _to_frames
    msg, serializers=serializers, on_error=on_error, context=context
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')

From what I can tell, it seems like there is some object that dask is trying to serialize that can't be pickled. Has anyone looked into this to diagnose why this is happening?

JiaweiZhuang commented 4 years ago

In https://github.com/pangeo-data/pangeo/issues/334#issuecomment-404393988, I got Dask-distributed working on a synthetic regridding problem (just a sparse matrix multiply) by using dask.array.map_blocks . Just switching to the distributed scheduler generally won't work, due to the relatively large size of regridding weights (often > 100 MB), which takes very long to serialize and often kills the cluster, as detailed here.

To make Dask-distributed work, there needs to be an explicit call to broadcast the weights to all workers: weights_future = client.scatter(weights, broadcast=True). And then pass the future as an additional argument: da.map_blocks(apply_weights, input_data, weights_future, ...). See the full code in this notebook.

I haven't tested dask-distributed on xarray DataArray/Dataset yet, and your error might be due to a different issue associated with xarray metadata other than dask. A quick way to test this is to pass the raw data instead, i.e. regridder(ds['air'].data), since xesmf also works on pure dask arrays.

JiaweiZhuang commented 4 years ago

Hmm, with regridder(ds['air'].data), I got ValueError: ctypes objects containing pointers cannot be pickled and the end of this long trace:

``` --------------------------------------------------------------------------- TypeError Traceback (most recent call last) /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func) 3173 try: -> 3174 result = cache[func] 3175 except KeyError: /srv/conda/envs/notebook/lib/python3.7/site-packages/zict/lru.py in __getitem__(self, key) 47 def __getitem__(self, key): ---> 48 result = self.d[key] 49 self.i += 1 TypeError: unhashable type: 'SubgraphCallable' During handling of the above exception, another exception occurred: ValueError Traceback (most recent call last) /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x) 37 try: ---> 38 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 39 if len(result) < 1000: ValueError: ctypes objects containing pointers cannot be pickled During handling of the above exception, another exception occurred: ValueError Traceback (most recent call last) in /srv/conda/envs/notebook/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs) 163 dask.base.compute 164 """ --> 165 (result,) = compute(self, traverse=False, **kwargs) 166 return result 167 /srv/conda/envs/notebook/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs) 434 keys = [x.__dask_keys__() for x in collections] 435 postcomputes = [x.__dask_postcompute__() for x in collections] --> 436 results = schedule(dsk, keys, **kwargs) 437 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 438 /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2532 retries=retries, 2533 user_priority=priority, -> 2534 actors=actors, 2535 ) 2536 packed = pack_data(keys, futures) /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors) 2459 { 2460 "op": "update-graph", -> 2461 "tasks": valmap(dumps_task, dsk3), 2462 "dependencies": dependencies, 2463 "keys": list(flatkeys), /srv/conda/envs/notebook/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap() /srv/conda/envs/notebook/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap() /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py in dumps_task(task) 3209 return d 3210 elif not any(map(_maybe_complex, task[1:])): -> 3211 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])} 3212 return to_serialize(task) 3213 /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func) 3178 cache[func] = result 3179 except TypeError: -> 3180 result = pickle.dumps(func) 3181 return result 3182 /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x) 49 except Exception: 50 try: ---> 51 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 52 except Exception as e: 53 logger.info("Failed to serialize %s. Exception: %s", x, e) /srv/conda/envs/notebook/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 1123 try: 1124 cp = CloudPickler(file, protocol=protocol) -> 1125 cp.dump(obj) 1126 return file.getvalue() 1127 finally: /srv/conda/envs/notebook/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 480 self.inject_addons() 481 try: --> 482 return Pickler.dump(self, obj) 483 except RuntimeError as e: 484 if 'recursion' in e.args[0]: /srv/conda/envs/notebook/lib/python3.7/pickle.py in dump(self, obj) 435 if self.proto >= 4: 436 self.framer.start_framing() --> 437 self.save(obj) 438 self.write(STOP) 439 self.framer.end_framing() /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 547 548 # Save the reduce() output and finally memoize the object --> 549 self.save_reduce(obj=obj, *rv) 550 551 def persistent_id(self, obj): /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) 636 else: 637 save(func) --> 638 save(args) 639 write(REDUCE) 640 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_tuple(self, obj) 784 write(MARK) 785 for element in obj: --> 786 save(element) 787 788 if id(obj) in memo: /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict /srv/conda/envs/notebook/lib/python3.7/pickle.py in _batch_setitems(self, items) 885 k, v = tmp[0] 886 save(k) --> 887 save(v) 888 write(SETITEM) 889 # else tmp is empty, and we're done /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_tuple(self, obj) 769 if n <= 3 and self.proto >= 2: 770 for element in obj: --> 771 save(element) 772 # Subtle. Same as in the big comment below. 773 if id(obj) in memo: /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 /srv/conda/envs/notebook/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj) 888 else: 889 if PY3: # pragma: no branch --> 890 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) 891 else: 892 self.save_reduce( /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) 636 else: 637 save(func) --> 638 save(args) 639 write(REDUCE) 640 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_tuple(self, obj) 769 if n <= 3 and self.proto >= 2: 770 for element in obj: --> 771 save(element) 772 # Subtle. Same as in the big comment below. 773 if id(obj) in memo: /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 547 548 # Save the reduce() output and finally memoize the object --> 549 self.save_reduce(obj=obj, *rv) 550 551 def persistent_id(self, obj): /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) 660 661 if state is not None: --> 662 save(state) 663 write(BUILD) 664 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict /srv/conda/envs/notebook/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 547 548 # Save the reduce() output and finally memoize the object --> 549 self.save_reduce(obj=obj, *rv) 550 551 def persistent_id(self, obj): /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) 660 661 if state is not None: --> 662 save(state) 663 write(BUILD) 664 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 502 f = self.dispatch.get(t) 503 if f is not None: --> 504 f(self, obj) # Call unbound method with explicit self 505 return 506 /srv/conda/envs/notebook/lib/python3.7/pickle.py in save_dict(self, obj) 854 855 self.memoize(obj) --> 856 self._batch_setitems(obj.items()) 857 858 dispatch[dict] = save_dict /srv/conda/envs/notebook/lib/python3.7/pickle.py in _batch_setitems(self, items) 880 for k, v in tmp: 881 save(k) --> 882 save(v) 883 write(SETITEMS) 884 elif n: /srv/conda/envs/notebook/lib/python3.7/pickle.py in save(self, obj, save_persistent_id) 522 reduce = getattr(obj, "__reduce_ex__", None) 523 if reduce is not None: --> 524 rv = reduce(self.proto) 525 else: 526 reduce = getattr(obj, "__reduce__", None) ValueError: ctypes objects containing pointers cannot be pickled ```
JiaweiZhuang commented 4 years ago

Aha, problem solved. Just set this before applying the regridder to data:

regridder._grid_in = None
regridder._grid_out = None

regridder._grid_in was to linked to ESMF objects that involve f2py and ctypes, and Dask was having trouble pickling it. In the next version I will make sure that the Regridder class does not refer to any ESMF objects.

JiaweiZhuang commented 4 years ago

The explicit broadcasting of regridding weights is still TBD, though.

In https://github.com/pangeo-data/pangeo/issues/334#issuecomment-404003411, I was thinking about adding an explicit regridder.set_distributed(client=client) call, to send the weights to all worker nodes. Then, regridder.weights will become a Dask future pointing to the distributed weights on all workers.

Or maybe there is a cleverer way to hide this explicit call from users. Any suggestions & PRs are extremely welcome!

dgergel commented 4 years ago

@JiaweiZhuang in your example notebook that you include above, what versions of xESMF and dask distributed are you using? I am still getting the ...cannot be pickled... error when I replicate your sample workflow.