pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.52k stars 1.05k forks source link

If a NetCDF file is chunked on disk, open it with compatible dask chunks #1440

Closed Zac-HD closed 10 months ago

Zac-HD commented 7 years ago

NetCDF4 data can be saved as chunks on disk, which has several benefits including efficient reads when using a compatible chunk shape. This is particularly important for files with chunk-based compression (ie all nc4 files with compression) or on HPC and parallel file systems (eg), where IO is typically dominated by the number of reads and chunks-from-disk are often cached. Caches are also common in network data backends such as Thredds OPeNDAP, in which case using disk-compatible chunks will reduce cache pressure as well as latency.

Xarray can use chunks, of course, but as of v0.9 the chunk size has to be specified manually - and the easiest way to discover it is to open the file and look at the _Chunksizes attribute for each variable. I propose that xr.open_dataset (and array, and mfdataset) change their default behaviour.

If Dask is available and chunks=None (the default), chunks should be taken from the file on disk. This may lead to a chunked or unchunked dataset. To force an un-chunked load, users can specify chunks={}, or simple .load() the dataset after opening it.

shoyer commented 7 years ago

My main concern is that netCDF4 chunk sizes (e.g., ~10-100KB in that blog post) are often much smaller than well sized dask chunks (10-100MB, per the Dask FAQ).

I do think it would be appropriate to issue a warning if you are making dask chunks that don't line up nicely with chunks on disk to avoid performance issues (in general each chunk on disk should usually end up on only one chunk in dask), but there are lots of options for aggregating to larger chunks and it's hard to choose the best way to do that without knowing how the data will be used.

jhamman commented 7 years ago

I'd certainly support a warning when dask chunks do not align with the on-disk chunks.

Beyond that, I think we could work on a utility for automatically determining chunks sizes for xarray using some heuristics. Before we go there though, I think we really should develop some performance benchmarks. We're starting to get a lot of questions/issues about performance and it seems like we need some benchmarking to happen before we can really start fixing the underlying issues.

shoyer commented 7 years ago

I think its unavoidable that users understand how their data will be processed (e.g., whether operations will be mapped over time or space). But maybe some sort of heuristics (if not a fully automated solution) are possible.

For example, maybe chunks={'time'} (note the set rather than a dict) could indicate "divide me into automatically chosen chunks over the time dimension". It's still explicit about how chunking is being done, but comes closer to expressing the intent rather than the details.

Zac-HD commented 7 years ago

I'd certainly support a warning when dask chunks do not align with the on-disk chunks.

This sounds like a very good idea to me πŸ‘

I think its unavoidable that users understand how their data will be processed (e.g., whether operations will be mapped over time or space). But maybe some sort of heuristics (if not a fully automated solution) are possible.

I think that depends on the size of the data - a very common workflow in our group is to open some national-scale collection, select a small (MB to low GB) section, and proceed with that. At this scale we only use chunks because many of the input files are larger than memory, and shape is basically irrelevant - chunks avoid loading anything until after selecting the subset (I think this is related to #1396).

It's certainly good to know the main processing dimensions though, and user-guided chunk selection heuristics could take us a long way - I actually think a dimension hint and good heuristics are likely to perform better than most users (who are not experts and have not profiled their performance).

The set notation is also very elegant, but I wonder about the interpretation. With chunks=, I specify how to break up the data - and any omitted dimensions are not chunked. For the hint, I'd expect to express which dimension(s) to keep - ie {'lat', lon'} should indicate that my analysis is mostly spatial, rather than mostly not. Maybe we could use a string (eg time for timeseries or lat lon for spatial) instead of a set to specify large chunk dimensions?

JanisGailis commented 7 years ago

We had a similar issue some time ago. We use xr.open_mfdataset to open long time series of data, where each time slice is a single file. In this case each file becomes a single dask chunk, which is appropriate for most data we have to work with (ESA CCI datasets).

We encountered a problem, however, with a few datasets that had very significant compression levels, such that a single file would fit in memory, but not a few of them, on a consumer-ish laptop. So, the machine would quickly run out of memory when working with the opened dataset.

As we have to be able to open 'automatically' all ESA CCI datasets, manually denoting the chunk sizes was not an option, so we explored a few ways how to do this. Aligning the chunk sizes with NetCDF chunking was not a great idea because of the reason shoyer mentions above. The chunk sizes for some datasets would be too small and the bottleneck moves from memory consumption to the amount of read/write operations.

We eventually figured (with help from shoyer :)) that the chunks should be small enough to fit in memory on an average user's laptop. yet as big as possible to maximize the amount of NetCDF chunks falling nicely in the dask chunk. Also, shape of the dask chunk can be of importance to maximize the amount of NetCDF chunks falling nicely in. We figured it's a good guess to divide both lat and lon dimensions by the same divisor, as that's also how NetCDF is often chunked.

So, we open the first file, determine it's 'uncompressed' size and then figure out if we should chunk it as 1, 2x2, 3x3, etc. It's far from a perfect solution, but it works in our case. Here's how we have implemented this: https://github.com/CCI-Tools/cate-core/blob/master/cate/core/ds.py#L506

Zac-HD commented 7 years ago

I love a real-world example πŸ˜„ This sounds pretty similar to how I'm thinking of doing it, with a few caveats - mostly that cate assumes that the data is 3D, has lat and lon, single time step, spatial dimensions wholly divisible some small N. Obviously this is fine for CCI data, but not generally true of things Xarray might open.

Taking a step back for a moment, chunks are great for avoiding out-of-memory errors, faster processing of reorderable operations, and efficient indexing. The overhead is not great when data is small or chunks are small, it's bad when a single on-disk chunk is on multiple dask chunks, and very bad when a dask chunk includes several files. (of course all of these are generalisations with pathological cases, but IMO good enough to build some heuristics on)

With that in mind, here's how I'd decide whether to use the heuristic:

Having decided to use a heuristic, we know the array shape and dimensions, the chunk shape if any, and the hint if any:

It's probably a good idea to constrain this further, so that the ratio of chunk edge length along dimensions should not exceed the greater of 100:1 or four times the ratio of chunks on disk (I don't have universal profiling to back this up, but it's always worked well for me). This will mitigate the potentially-very-large effects of dimension order, especially in unchunked files or large chunks.

For datasets (as opposed to arrays), I'd calculate chunks once for the largest dtype and just reuse that shape.

JanisGailis commented 7 years ago

I quite like the approach you're suggesting! What I dislike the most currently with our approach is that it is a real possibility that a single netCDF chunk falls into multiple dask chunks, we don't control for that in any way! I'd happily swap our approach out to the more general one you suggest.

This does of course beg for input regarding the API constraints, as in, would it be a good idea to add more kwargs for chunk size threshold and edge ratio to open functions?

Zac-HD commented 7 years ago

πŸŽ‰

My view is actually that anyone who can beat the default heuristic should just specify their chunks - you'd already need a good sense for the data and your computation (and the heuristic!). IMO, the few cases where tuning is desirable - but manual chunks are impractical - don't justify adding yet another kwarg to the fairly busy interfaces.

matt-long commented 7 years ago

I have encountered a related issue here. When I read a file with netCDF4 compression into a Dataset, a subsequent call to write the dataset using to_netcdf fails.

For instance, using data from the POP model, I can convert output to netCDF4 using NCO

$ ncks --netcdf4 --deflate 1 $file nc4-test.nc

Then in Python:

ds = xr.open_dataset('nc4-test.nc',decode_times=False,decode_coords=False)
ds.to_netcdf('test-out.nc')

The write fails with: "RuntimeError: NetCDF: Bad chunk sizes."

If I include format = NETCDF3_64BIT, the write completes.

This seems like a bug.

jhamman commented 7 years ago

@Zac-HD - I'm about to put up a PR with some initial benchmarking functionality (#1457). Are you open to putting together PR for the features you've described above? Hopefully, these two can work together.

As for the API changes related to this issue, I'd propose the following:

Use the chunks keyword to support 3 additional options

def open_dataset(filename_or_obj, ..., chunks=None, ...):
    """Load and decode a dataset from a file or file-like object.
    Parameters
    ----------
     ....
    chunks : int or dict or set or 'auto' or 'disk', optional
        If chunks is provided, it used to load the new dataset into dask
        arrays. ``chunks={}`` loads the dataset with dask using a single
        chunk for all arrays.
    ...
    """ 
Zac-HD commented 7 years ago

@matt-long, I think that's a separate issue. Please open a new pull request, including a link to data that will let us reproduce the problem.

@jhamman - [updated] I was always keen to work on this if I could make time, but have since changed jobs. However I'd still be happy to help anyone who wants to work on it with design and review.

I definitely want to preserve the exact present semantics of dict arguments (so users have exact control, with a warning if it's incompatible with disk chunks). I may interpret int arguments as a (deprecated) hint though, as that's what it's mostly used for, and will add a fairly limited hints API to start with - more advanced users can just specify exact chunks.

matt-long commented 7 years ago

@Zac-HD: Thanks, I submitted a separate issue: #1458

Zac-HD commented 7 years ago

I've just had a meeting at NCI which has helped clarify what I'm trying to do and how to tell if it's working. This comment is mostly for my own notes, and public for anyone interested. I'll refer to dask chunks as 'blocks' (as in 'blocked algorithms'), and netcdf chunks in a file as 'chunks', to avoid confusion)

The approximate algorithm I'm thinking about is outlined in this comment above. Considerations, in rough order of performance impact, are:

Bottom line, I could come up with something pretty quickly but would perfer to take a little longer to write and explore some benchmarks.

jhamman commented 7 years ago

@Zac-HD - thanks for you detailed report.

ping me again when you get started on some benchmarking and feel free to chime in further to #1457.

No block should include data from multiple files (near-absolute, due to locking - though concurrent read is supported on lower levels?)

Hopefully we can find some optimizations that help with this. I routinely want to do this, though I understand why its not always a good idea.

jhamman commented 7 years ago

@Zac-HD - We merged #1457 yesterday which should give us a platform to test any improvements we make related to this issue.

jhamman commented 6 years ago

cc @kmpaul who wanted to review this conversation.

stale[bot] commented 4 years ago

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

rabernat commented 4 years ago

We discussed this issue today in our pangeo coffee break. We think the following plan would be good:

Should we have an option like chunk_size='native', or chunk_size='100MB', with chunks chosen to align with source chunks.

dcherian commented 4 years ago

should we have an option like chunk_size='native', or chunk_size='100MB'

Can we overload the chunks argument in open_xxx to do this? We are already adding support for chunks="auto" ...

rabernat commented 4 years ago

Can we overload the chunks argument in open_xxx to do this? We are already adding support for chunks="auto" ...

This gets tricky, because we may want slightly different behavior depending on whether the underlying array store is chunked.

kmpaul commented 4 years ago

@rabernat When you say "underlying array store", are you talking about the storage layer? That is, the zarr store or the netcdf file?

It seems to me that the there are lots of "layers" of "chunking", especially when you are talking about chunking an entire dataset, which really confuses the whole issue. On an HPC system, there's filesystem blocksize, NetCDF/HDF5 "internal" chunks, chunking by spreading the data over multiple files, and in-memory chunks (i.e., Dask chunks). I'm not an expert on object store, but my understanding of object store is that (if you are storing NetCDF/HDF5 on object store) there is still "interal" NetCDF/HDF5 "chunking", then chunking over objects/files, and then in-memory chunking.

rabernat commented 4 years ago

It seems to me that the there are lots of "layers" of "chunking", especially when you are talking about chunking an entire dataset,

To simplify a little bit, here we are only talking about reading a single store, i.e. one netcdf file or one zarr group. Also out of scope is the underlying storage medium (e.g. block size).

stale[bot] commented 2 years ago

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

lskopintseva commented 1 year ago

I have a netCDF file where variables are saved on the disk in chunks and I would like to read my netcdf file using xr.open_dataset in original chunks. I there a way to do it in xarray? Since xarray is built in netCDF4 library, I would expect this feature to be present in xarray as well..

jhamman commented 1 year ago

@lskopintseva - This feature has not been implemented in Xarray (yet). In the meantime, you might find something like this helpful:

ds = xr.open_dataset("dataset.nc")
for v in ds.data_vars:
    # get variable chunksizes 
    chunksizes = ds[v].encoding.get('chunksizes', None)
    if chunksizes is not None:
        chunks = dict(zip(ds[v].dims, chunksizes))
        ds[v] = ds[v].chunk(chunks)   # chunk the array using the underlying chunksizes

FWIW, I think this would be a nice feature to add to the netcdf4 and h5netcdf backends in Xarray. Contributions welcome!

jhamman commented 10 months ago

Epic! Thanks @mraspaud for finally knocking off this 6-year old issue. πŸŽ‰