pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

Alternative scheduling systems (beyond dask) #29

Closed shoyer closed 4 years ago

shoyer commented 4 years ago

I saw this mentioned in Ryan's blog post but didn't see an issue, so I thought I would start one!

My rough thought is that this should start by factoring out the dask specific details from api.py into a separate Executor class, e.g., Rechunked should use composition (eventually duck typing) rather than inheritance from dask.Delayed object. It looks like the hard dependency on dask could be removed without much difficulty.

In terms of what the Executor needs, I guess it actually comes down to a single-method for copying from one list of zarr arrays to another, along with some way of supporting chaining. Perhaps:

executor.copy_arrays(source, intermediate)
executor.copy_arrays(intermediate, destination)
executor.execute()

or maybe just a single function would suffice?

# this is a little nicer since the public API doesn't involve mutation
rechunked = staged_copy([source, intermediate, destination])
rechunked.execute()

where staged_copy could how be passed as a user supplied function into the top level rechunk(). We could supply some default options bundled in Rechunker itself, e.g., the default value which would use Dask.

rabernat commented 4 years ago

It looks like the hard dependency on dask could be removed without much difficulty.

This is definitely the design I had in mind. The dask dependency was more just a convenience to get an MVP released. The core algorithm is in the algorithm module.

What you sketched out looks reasonable, but I'm not sure I follow everything. What would a staged_copy implementation look like? Where are the individual tasks in this design? I would definitely seek @TomAugspurger's input here.

My vision for how to generalize execution was to have some generators that spit out serializable python functions for each task in the rechunking operation. These could be executed with dask or any other execution framework. Is that compatible with what you're thinking?

@tomwhite also has some experiments that make rechunker work with pywren, although still via dask: https://github.com/tomwhite/dask-executor-scheduler/commit/343882c14fc7bd4010bc7c77b20e8f65c5ea2f44

shoyer commented 4 years ago

My vision for how to generalize execution was to have some generators that spit out serializable python functions for each task in the rechunking operation. These could be executed with dask or any other execution framework. Is that compatible with what you're thinking?

I'm sure these are compatible. Is the API you were thinking of closer to execute([copy_source_to_int_tasks, copy_int_to_dest_tasks])?

My proposal is a little higher level, which has pluses and minus:

Even for dask, I think this might be a little easier. The implementation of staged_copy() for dask would look something like:

def copy_arrays(source, dest):
  return dask.array.store(dask.array.from_zarr(source), dest, compute=False)

def staged_copy(arrays):
  for sources, destinations in zip(arrays[:-1], arrays[1:]):
    for source, dest in zip(sources, destinations):
      copy_arrays(source, dest)  # plus book-keeping on the task graph

The lower level version would require rewriting the copy_arrays utility function -- or maybe just using the dask.array functions and pulling out the compute graph?

TomAugspurger commented 4 years ago

@shoyer is this good to close?

shoyer commented 4 years ago

Yes, I think we have a good version of this working now :)