pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.59k stars 1.08k forks source link

Integration with dask/distributed (xarray backend design) #798

Closed pwolfram closed 5 years ago

pwolfram commented 8 years ago

Dask (https://github.com/dask/dask) currently provides on-node parallelism for medium-size data problems. However, large climate data sets will require multiple-node parallelism to analyze large climate data sets because this constitutes a big data problem. A likely solution to this issue is integration of distributed (https://github.com/dask/distributed) with dask. Distributed is now integrated with dask and its benefits are already starting to be realized, e.g., see http://matthewrocklin.com/blog/work/2016/02/26/dask-distributed-part-3.

Thus, this issue is designed to identify the steps needed to perform this integration, at a high-level. As stated by @shoyer, it will

definitely require some refactoring of the xarray backend system to make this work cleanly, but that's OK -- the xarray backend system is indicated as experimental/internal API precisely because we hadn't figured out all the use cases yet."

To be honest, I've never been entirely happy with the design we took there (we use inheritance rather than composition for backend classes), but we did get it to work for our use cases. Some refactoring with an eye towards compatibility with dask distributed seems like a very worthwhile endeavor. We do have the benefit of a pretty large test suite covering existing use cases.

Thus, we have the chance to make xarray big-data capable as well as provide improvements to the backend.

To this end, I'm starting this issue to help begin the design process following the xarray mailing list discussion some of us have been having (@shoyer, @mrocklin, @rabernat).

Task To Do List:

pwolfram commented 8 years ago

The full mailing list discussion is at https://groups.google.com/d/msgid/xarray/CAJ8oX-E7Xx6NT4F6J8B4__Q-kBazoob9_qe_oFLi5hany9-%3DKQ%40mail.gmail.com?utm_medium=email&utm_source=footer

pwolfram commented 8 years ago

See also https://github.com/dask/dask/issues/922

mrocklin commented 8 years ago

Copying over a comment from that issue:

Yes, so the problem as I see it is that, for serialization and open-file reasons we want to use a function like the following:

def get_chunk_of_array(filename, datapath, slice):
    with netCDF4.Dataset(filename) as f:
        return f.variables[datapath][slice]

However, this opens and closes many files, which while robust, is slow. We can alleviate this by maintaining an LRU cache in a global variable so that it is created separately per process.

from toolz import memoize

cache = LRUDict(size=100, on_eviction=lambda file: file.close())

netCDF4_Dataset = memoize(netCDF4.Dataset, cache=cache)

def def get_chunk_of_array(filename, datapath, slice):
    f = netCDF4_Dataset(filename)
    return f.variables[datapath][slice]

I'm happy to supply the memoize function with toolz and an appropriate LRUDict object with other microprojects that I can publish if necessary.

We would then need to use such a function within the dask.array and xarary codebases.

Anyway, that's one approach. Thoughts welcome.

pwolfram commented 8 years ago

Here is an example of a use case for a nanmean over ensembles in collaboration with @mrocklin and following http://matthewrocklin.com/blog/work/2016/02/26/dask-distributed-part-3: https://gist.github.com/mrocklin/566a8d5c3f6721abf36f

pwolfram commented 8 years ago

@shoyer and @mrocklin, I've updated the summary above in the PR description with a to do list. Do either of you see any obvious tasks I missed on the list in the PR description? If so, can you please update the to do list so that I can see what needs done to modify the backend for the dask/distributed integration?

pwolfram commented 8 years ago

Repeating @mrocklin:

Dask.array writes data to any object that supports numpy style setitem syntax like the following:

dataset[my_slice] = my_numpy_array

Objects like h5py.Dataset and netcdf objects support this syntax.

So dask.array would work today without modification if we had such an object that represented many netcdf files at once and supported numpy-style setitem syntax, placing the numpy array properly across the right files. This work could happen easily without deep knowledge of either project.

Alternatively, we could make the dask.array.store function optionally lazy so that users (or xarray) could call store many times before triggering execution.

pwolfram commented 8 years ago

This issue of connecting to dask/distributed may also be connected with https://github.com/pydata/xarray/issues/463, https://github.com/pydata/xarray/issues/591, and https://github.com/pydata/xarray/pull/524.

mrocklin commented 8 years ago

I believe that robustly supporting HDF/NetCDF reads with the mechanism mentioned above will resolve most problems from a dask.array perspective. I have no doubt that other things will arise though. Switching from shared to distributed memory always come with (surmountable) obstacles

shoyer commented 8 years ago

I agree with @mrocklin that the LRUCache for file-like objects should take care of things from the dask.array perspective. It should also solve https://github.com/pydata/xarray/issues/463 in a very clean way. We'll just need to reorganize things a bit to make use of it.

pwolfram commented 8 years ago

Thanks @shoyer. If you can provide some guidance on bounds for the reorganization that would be really great. I want your and @jhamman's feedback on this before I try a solution. The trick is just to make the time, as always, and I may have some time this coming weekend.

pwolfram commented 8 years ago

Another note in support of this PR, especially "robustly support HDF/NetCDF reads": I am having problems with NetCDF: HDF error as previously reported by @rabernat in https://github.com/pydata/xarray/issues/463. Thus, a solution here will save time and may arguably be on the critical path of some workflows because fewer jobs will fail and require baby-sitting/restarts, especially when dealing with running multiple jobs.

mrocklin commented 8 years ago

FWIW I've uploaded a tiny LRU dict implementation to a new zict project (which also has some other stuff):

http://zict.readthedocs.org/en/latest/

pip install zict
from zict import LRU
d = LRU(100, dict())

There are a number of good alternatives out there though for LRU dictionaries.

pwolfram commented 8 years ago

Thanks @mrocklin! This has been really helpful and was what I needed to get going.

A prelim design I'm seeing is to modify the NetCDF4DataStore class https://github.com/pydata/xarray/blob/master/xarray/backends/netCDF4_.py#L170 to meet these requirements:

  1. At __init__, try to open file via the LRU cache. I think the LRU dict has to be a global because because the file restriction is an attribute of the system, correct?
  2. For each read from a file, ensure it hasn't been closed via a @ds.getter property method. If so, reopen it via the LRU cache. This is ok because for a read the file is essentially read-only. The LRU closes out stale entries to prevent the too many open file errors. Checking this should be fast.
  3. sync is only for a write but seems like it should following the above approach.

A clean way to do this is just to make sure that each time self.ds is called, it is re-validated via the LRU cache. This should be able to be implemented via property getter methods https://docs.python.org/2/library/functions.html#property.

Unless I'm missing something big, I don't think this change will require at large refactor but it is quite possible I overlooked something important. @shoyer and @mrocklin, do you see any obvious pitfalls in this scope? If not, it shouldn't be too hard to implement.

fmaussion commented 8 years ago

Sorry if I am just producing noise here (I am not a specialist), but I have two naive questions:

To 1. how will you handle concurrent access to the LRU cache if it's a global variable?

To 2. Once the file has been closed by the LRU, won't it also be erased from it? So that a simple if file in LRU: could suffice as a test if the file has been closed or not?

shoyer commented 8 years ago

I think the LRU dict has to be a global because because the file restriction is an attribute of the system, correct?

Correct, the LRU dict should be global. I believe the file restriction is generally per-process, and creating a global dict should assure that works properly.

For each read from a file, ensure it hasn't been closed via a @ds.getter property method. If so, reopen it via the LRU cache. This is ok because for a read the file is essentially read-only. The LRU closes out stale entries to prevent the too many open file errors. Checking this should be fast.

The challenge is that we only call the .get_variables() method (and hence self.ds) once on a DataStore when a Dataset is opened from disk. I think we need to refactor NetCDF4ArrayWrapper to take a filename instead, and use something like @mrocklin's netcdf_Dataset.

My bigger concern was how to make use of a method like futures_to_dask_arrays. But it looks like that may actually not be necessary, at least if we are happy to open all netCDF files (and read out the metadata) from a master process.

pwolfram commented 8 years ago

Just to be clear, we are talking about this https://github.com/mrocklin/hdf5lazy/blob/master/hdf5lazy/core.py#L83 for @mrocklin's netcdf_Dataset, right?

pwolfram commented 8 years ago

@fmaussion, for

  1. The LRU cache should be used serially for the read initially, but something more like @mrocklin's netcdf_Dataset appears to be needed as @shoyer points out. I need to think about this more.
  2. I was thinking we would keep track of the file name outside the LRU and only use the filename to open up datasets inside the LRU if they aren't already open. Agreed that if file in LRU should designate whether the file is open.
pwolfram commented 8 years ago

@shoyer, if if we are happy to open all netCDF files and read out the metadata from a master process that would imply that we would open a file, read the metadata, and then close it, correct?

Array access should then follow something like the @mrocklin's netcdf_Dataset approach, right?

shoyer commented 8 years ago

@pwolfram I was referring to this comment for @mrocklin's netCDF4_Dataset.

shoyer commented 8 years ago

@shoyer, if if we are happy to open all netCDF files and read out the metadata from a master process that would imply that we would open a file, read the metadata, and then close it, correct?

Array access should then follow something like the @mrocklin's netcdf_Dataset approach, right?

Yes, this is correct.

In principle, if we have a very large number of files containing many variables each, we might want to do the read in parallel using futures, and then use something like futures_to_dask_arrays to bring them together. That seems much trickier to integrate into our current backend approach.

mrocklin commented 8 years ago

It's probably best to avoid futures within xarray, so far they're only in the distributed memory scheduler. I think that ideally we create graphs that can be used robustly in either. I think that the memoized netCDF4_Dataset approach can probably do this just fine. Is there anything that is needed from me to help push this forward?

kynan commented 8 years ago

Has this issue progressed since?

Being able to distribute loading of files to a dask cluster and composing an xarray Dataset from data on remote workers would be a great feature.

Is @mrocklin's blog post from Feb 2016 still the reference for remote data loading on a cluster? Adapting it to loading xarray Datasets rather than plain arrays is not straightforward since there is no way to combine futures representing Datasets out of the box.

mrocklin commented 8 years ago

I haven't worked on this but agree that it is important.

pwolfram commented 8 years ago

@kynan, I'm still interested in this but have not had time to advance this further. Are you interested in contributing to this too?

I view this as a key component of future climate analysis workflows. This may also be something that is addressed at the upcoming hackathon at Columbia with @rabernat early next month.

Also, I suspect that both @mrocklin and @shoyer would be willing to continue to provide key advice because this appears to be aligned with their interests too (please correct me if I'm wrong in this assessment).

mrocklin commented 8 years ago

Definitely happy to support from the Dask side.

I think that the LRU method described above is feasible.

mrocklin commented 8 years ago

If XArray devs want to chat sometime I suspect we could hammer out a plan fairly quickly. My hope is that once a plan exists then a developer will arise to implement that plan. I'm free all of today and tomorrow.

pwolfram commented 8 years ago

@mrocklin, I would be happy to chat because I am interested in seeing this happen (e.g., eventually contributing code). The question is whether we need additional expertise from @shoyer, @jhamman, @rabernat etc who likely have a greater in-depth understanding of xarray than me. Perhaps this warrants an email to the wider list?

mrocklin commented 8 years ago

I agree that this conversation needs expertise from a core xarray developer. I suspect that this change is more likely to happen in xarray than in dask.array. Happy to continue the conversation wherever. I do have a slight preference to switch to real-time at some point though. I suspect that we can hash this out in a moderate number of minutes.

rabernat commented 8 years ago

This is a really important idea that has the potential to accelerate xarray from "medium data" to "big data". It should be planned out thoughtfully.

My view is that we should implement a new DataStore class to handle distributed datasets. This could live in the xarray backend, or it could be a standalone package. Such a data store could be the foundation of a powerful platform for big-climate-data analysis. (Or maybe I am thinking too ambitiously.)

I think the upcoming aospy workshop will be an ideal opportunity to discuss this, since many of the people on this thread will be face-to-face.

mrocklin commented 8 years ago

I agree that we should discuss it at the workshop. I also think it's possible that this could be accomplished by the right person (or combination of people) in a few hours. If so I think that we should come with it in hand as a capability that exists rather than a capability that should exist.

kynan commented 8 years ago

I'm probably not familiar enough with either the xarray or dask / distributed codebases to provide much input but would be happy to contribute if / where it makes sense. Would also be happy to be part of a some real-time discussion if feasible (based in the UK, so wouldn't be able to attend the workshop).

shoyer commented 8 years ago

I'm happy to help work out a plan here.

It seems like there are basically two steps we need to make this happen:

  1. Write the equivalent of futures_to_dask_arrays for xarray.Dataset, i.e., futures_to_xarray_datasets_of_dask_arrays.
  2. Integrate this into xarray's higher level utility functions like open_mfdataset. This should be pretty easy after we have futures_to_xarray_datasets_of_dask_arrays.

It's an open question to what extent this needs to interact with xarray's internal backends.DataStore API, which handles the details of decoding files on disk to xarray.Dataset objects. I'm hopeful the answer is "not very much". The DataStore API is a bit cumbersome and overly complex, and could use a refactoring.

mrocklin commented 8 years ago

The futures_to_dask_arrays function has been deprecated at this point. The standard way to produce a distributed dask.array from custom functions is as follows:

The same approach could be used with XArray except that presumably we would need to do this for every relevant dataset within the NetCDF file.

shoyer commented 8 years ago

@mrocklin OK, that makes sense. In that case, we might indeed need to thread this through xarray's backends.

Currently, backends open a file (e.g., with netCDF4.Dataset) and create an OrderedDict of xarray.Variable objects with lazy arrays that load from the file on demand. To load this data with dask, pass these lazy arrays into dask.array.from_array.

This currently doesn't use dask.delayed for three reasons:

  1. Historical: we wrote this system before dask existed.
  2. Performance: our LazilyIndexedArray class is still more selective than dask.array for subsetting data from large chunks, which is essential for many interactive use cases. Despite getitem fusing, dask will sometimes load complete chunks. This is particularly true if we do some transformation of the array, of the sort that could be accomplished with dask's map_blocks. Using LazilyIndexedArray ensures that this only gets applied to loaded data. There are also performance benefits to keeping files open when possible (discussed above).
  3. Dependencies: dask is still an optional dependency for xarray. I'd like to keep it that way, if possible.

It seems like a version of xarray's backends that doesn't always open files immediately would make it suitable for use in dask.distributed. So indeed, we'll need to do some serious refactoring.

One other thing that will need to be tackled eventually: xarray.merge and xarray.concat (used in open_mfdataset) still have some steps (checking for equality between arrays) that are applied sequentially. This is going to be a performance bottleneck when we start working with very large arrays. This really should be refactored such that dask can do these evaluations in a single step, rather than once per object. For now, this can be avoided in concat by using the data_vars/coords options.

mrocklin commented 8 years ago

We could possibly make an object that was API compatible with the subset of netCDF4.Dataset that you needed, but opened and closed the file whenever it actually pulled data. We would keep an LRU cache of open files around for efficiency as discussed earlier. In this case we could possibly optionally swap out the current netCDF4.Dataset object with this thing without much refactoring?

mrocklin commented 8 years ago

We seem to be making good progress here on the issue. I'm also happy to switch to real-time voice at any point today or tomorrow if people prefer.

shoyer commented 8 years ago

We could possibly make an object that was API compatible with the subset of netCDF4.Dataset that you needed, but opened and closed the file whenever it actually pulled data. We would keep an LRU cache of open files around for efficiency as discussed earlier. In this case we could possibly optionally swap out the current netCDF4.Dataset object with this thing without much refactoring?

Yes, this could work for a proof of concept.

In the long term, it would be good to integrate this into xarray so we can support alternative backends (e.g., h5netcdf, scipy, pynio, loaders for custom file formats like @rabernat and @pwolfram work with) in a fully consistent fashion without needing to make a separate wrapper for each.

mrocklin commented 8 years ago

One alternative would be to define custom serialization for netCDF4.Dataset objects.

I've been toying with the idea of custom serialization for dask.distributed recently. This was originally intended to let Dask make some opinionated serialization choices for some common formats (usually so that we can serialize numpy arrays and pandas dataframes faster than their generic pickle implementations allow) but this might also be helpful here to allow us to serialize netCDF4.Dataset objects and friends.

We would define custom dumps and loads functions for netCDF4.Dataset objects that would presumably encode them as a filename and datapath. This would get around the open-many-files issue because the dataset would stay in the worker's .data dictionary while it was needed.

One concern is that there are reasons why netCDF4.Dataset objects are not serializable (see https://github.com/h5py/h5py/issues/531). I'm not sure if this would affect XArray workloads.

kynan commented 7 years ago

For the case where NetCDF / HDF5 files are only available on the distributed workers and not directly accessible from the client, how would you get the necessary metadata (coords, dims etc.) to construct the xarray.Dataset?

mrocklin commented 7 years ago

You wouldn't

On Tue, Oct 25, 2016 at 9:43 AM, Florian Rathgeber <notifications@github.com

wrote:

For the case where NetCDF / HDF5 files are only available on the distributed workers and not directly accessible from the client, how would you get the necessary metadata (coords, dims etc.) to construct the xarray.Dataset?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pydata/xarray/issues/798#issuecomment-256038226, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszIYbrYoqqJMwu5FFoxu5SWSJSTnoks5q3geGgaJpZM4H1p4q .

delgadom commented 7 years ago

Could this extend the OpeNDAP interface? That would solve the metadata problem and would provide quick access to the distributed workers.

mrocklin commented 7 years ago

We could pull data from OpenDAP. Actually computing on those workers would probably be hard to integrate. Distributed Dask.array could possibly replace OpenDAP in some settings though, serving not only data, but also computation.

shoyer commented 7 years ago

Distributed Dask.array could possibly replace OpenDAP in some settings though

Yes, this sounds quite promising to me.

Using OpenDAP for communication is also possible, but if all we need to do is pass around serialized xarray.Dataset objects using pickle or even bytes from netCDF files seems more promising.

mrocklin commented 7 years ago

I may have a solution to this in https://github.com/dask/distributed/pull/606, which allows for custom serialization formats to be registered with dask.distributed. We would register serialize and deserialize functions for the various netCDF objects. Something like the following might work for h5py:

def serialize_dataset(dset):
    header = {}
    frames = [dset.filename.encode(), dset.datapath.encode()]
    return header, frames

def deserialize_dataset(header, frames):
    filename, datapath = frames
     f = h5py.File(filename.decode())
    dest = f[datapath.decode()]
    return dset

register_serialization(h5py.Dataset, serialize_dataset, deserialize_dataset)

We still have lingering open files but not too many per machine. They'll move around the network, but only as necessary.

mrocklin commented 7 years ago

Custom serialization is in dask/distributed. This allows for us to build custom serialization solutions like the following for h5py.Dataset: https://github.com/dask/distributed/pull/620/files

Any concerns would be very welcome. Earlier is better.

shoyer commented 7 years ago

We have something very hacky working with https://github.com/pydata/xarray/pull/1095

I'm also going to see if I can get something working with the LRU cache, since that seems closer to the solution we want eventually.

mrocklin commented 7 years ago

FYI Dask is committed to maintaining this: https://github.com/dask/zict/blob/master/zict/lru.py

shoyer commented 7 years ago

One slight subtlety is writes. We'll need to switch from 'w' to 'a' mode the second time we open a file. On Tue, Nov 8, 2016 at 8:17 AM Matthew Rocklin notifications@github.com wrote:

FYI Dask is committed to maintaining this: https://github.com/dask/zict/blob/master/zict/lru.py

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/pydata/xarray/issues/798#issuecomment-259181856, or mute the thread https://github.com/notifications/unsubscribe-auth/ABKS1rz8sYoBXjMbJvQqrP3XHZx3_fJhks5q8KCRgaJpZM4H1p4q .

shoyer commented 7 years ago

A few other thoughts on thread safety with the LRU approach:

  1. We need to a global lock ensure internal consistency of the LRU cache, and so that we don't overwrite files without closing them. It probably makes sense to put this in memoize function.
  2. We need separate, per file locks, to ensure that we don't evict files in the process of reading or writing data from them (which would cause segfaults). As a stop-gap measure, we could simply refuse to evict files until we can acquire a lock, but more broadly this suggests that strict LRU is not quite right. Instead, we want to evict the least-recently-used unlocked item.
kynan commented 7 years ago

Great to see this moving! I take it the workshop was productive?

How does #1095 work in the scenario of a distributed scheduler with remote workers? Do I understand correctly that all workers and the client would need to see the same shared filesystem from where NetCDF files are read?