pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

Refactor executors #77

Closed rabernat closed 3 years ago

rabernat commented 3 years ago

Overview

This is a major refactor of the internals of rechunker. I have changed the interface between the executors and the rechunking plan by adding some new types. The executors all now accept something called a ParallelPipelines object. The hierarchy of types looks like this

ParallelPipelines = Iterable[MultiStagePipeline]
MultiStagePipeline = Iterable[Stage]
class Stage(NamedTuple):
    func: Callable
    map_args: Optional[Iterable] = None

Stages contain a single function that is mapped across many inputs (e.g. a single copy operation). MultiStagePipelienes contain multiple Stages (e.g. copy source to intermediate, then intermediate to target). ParallelPipelines contain several MultiStagePipelines that can be executed in parallel.

The rechunk function now contains a line called pipelines = specs_to_pipelines(copy_spec) which translates a list of CopySpecs to a ParallelPipelines. This function contains all of the logic about how to execute a copy operation. All the executors needs to do is know how to execute a generic ParallelPipelines.

Motivation

The abstractions we have created in rechunker, which allow many different distributed execution engines to be used for the same computation, are very useful and cool. The underlying motivation for this refactor was to make the executors more general, such that they can be used in other projects (e.g. Pangeo Forge). This is accomplished by decoupling the details of the rechunking operation as currently implemented from the Executor class.

Pros

Beyond the motivation above, this approach has a couple of major benefits:

Cons

But there are some downsides:

Todo

At this point I would love to get a preliminary review from anyone who is interested.

shoyer commented 3 years ago

I like this general idea! My main concern is that this would preclude the option to avoid using intermediate arrays in favor of executor-native groupby operations, e.g., like what is sketched out in https://github.com/pangeo-data/rechunker/pull/36 for Beam. In principle, avoiding the intermediate copy could be up to twice as fast, if an executor like Beam or Spark manages to hold all the intermediate values in memory instead of dumping to disk.

rabernat commented 3 years ago

Good point Stephan. I think there is a simple resolution; we make the CopySpec -> Pipeline translation optional and allow executors to natively work on CopySpecs if they prefer. Stand by for an update to implement this.

shoyer commented 3 years ago

Good point Stephan. I think there is a simple resolution; we make the CopySpec -> Pipeline translation optional and allow executors to natively work on CopySpecs if they prefer. Stand by for an update to implement this.

Sounds good to me!

codecov[bot] commented 3 years ago

Codecov Report

Merging #77 (a5a3a29) into master (c59f303) will decrease coverage by 0.31%. The diff coverage is 98.13%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #77      +/-   ##
==========================================
- Coverage   97.76%   97.44%   -0.32%     
==========================================
  Files          10       12       +2     
  Lines         447      509      +62     
  Branches       89       93       +4     
==========================================
+ Hits          437      496      +59     
- Misses          5        7       +2     
- Partials        5        6       +1     
Impacted Files Coverage Δ
rechunker/executors/util.py 100.00% <ø> (ø)
rechunker/executors/dask.py 94.54% <94.00%> (-5.46%) :arrow_down:
rechunker/algorithm.py 82.45% <100.00%> (ø)
rechunker/api.py 100.00% <100.00%> (ø)
rechunker/compat.py 100.00% <100.00%> (ø)
rechunker/executors/__init__.py 100.00% <100.00%> (ø)
rechunker/executors/beam.py 100.00% <100.00%> (ø)
rechunker/executors/prefect.py 100.00% <100.00%> (ø)
rechunker/executors/python.py 100.00% <100.00%> (ø)
rechunker/executors/pywren.py 100.00% <100.00%> (ø)
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update c59f303...a5a3a29. Read the comment docs.

rabernat commented 3 years ago

For context, the tests are currently failing due to the pre-commit mypy failure described above. That's the only blocker here.

rabernat commented 3 years ago

I just put this through its paces on google cloud, and I'm satisfied it is working ok for real world use cases. I'm going to merge. It would be great if other users (maybe @rsignell-usgs?) could take this for a spin by installing rechunker from github master.