pangeo-data / pangeo

Pangeo website + discussion of general issues related to the project.
http://pangeo.io
704 stars 189 forks source link

xesmf included in pangeo.pydata.org ? #309

Closed naomi-henderson closed 6 years ago

naomi-henderson commented 6 years ago

Hi all, I have put together a use-case for reproducing the canonical ICCP-AR5 Figure12.5 using the CMIP5 multi-model surface temperature data. I have converted the needed netcdf files to zarr and uploaded to pangeo.pydata.org. I just noticed that the xesmf packages has not been included, so I can't do the fast and easy spatial regridding provided by xesmf to combine all models on the same grid. See: use case on pangeo.pydata.org.
Any chance we can add xesmf to the standard packages?

Here is the figure this notebook can reproduce:

wgi_ar5_fig12-5

rabernat commented 6 years ago

This is a great point @naomi-henderson. (FYI, we can't follow the link http://pangeo.pydata.org/user/naomi-henderson/lab/tree/CMIP5-MakeFigure12.5.ipynb. If you would like to share a notebook from pangeo, your best bet right now is to download it and upload it as a gist.)

We have discussed xesmf use cases in #197, but your example is much more concrete. It would be a great example to highlight the capabilities of pangeo to the climate community.

The main roadblock right now is pangeo-data/helm-chart#29, which is holding us up from adding new packages to pangeo.pydata.org. Hopefully that will be resolved very soon, and we can move ahead with this.

naomi-henderson commented 6 years ago

Ah, thanks @rabernat - I will wait for this to be sorted out. I will also need xESMF to finish the atmospheric moisture budget use case. The ability to do regridding is essential in dealing with the CMIP5 (and soon to come CMIP6) data.

mrocklin commented 6 years ago

xesmf is now on the stock software environment on Pangeo. I believe that this is now resolved. I'm curious to see it used.

cc @JiaweiZhuang

naomi-henderson commented 6 years ago

@mrocklin fantastic! So far so good, xesmf is happily calculating the weight files and reusing them when appropriate. Thanks much

rabernat commented 6 years ago

I wonder how xesmf will work with distributed. Will it scatter the weight matrix to the workers?

Sent from my iPhone

On Jun 13, 2018, at 4:57 PM, Naomi Henderson notifications@github.com wrote:

@mrocklin fantastic! So far so good, xesmf is happily calculating the weight files and reusing them when appropriate. Thanks much

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

JiaweiZhuang commented 6 years ago

Glad to see this! @naomi-henderson could you share your notebook (I still cannot access it)? Let me try to tweak it to support distributed.

naomi-henderson commented 6 years ago

That would be great, @JiaweiZhuang ! The notebook runs fine (under 3 minutes) on my linux box, but unfortunately gets bogged down at the beginning when reading the zarr data on pangeo.pydata.org. The xesmf regridding is so fast in comparison that I haven't had a chance to check out how the workers are handling this part. (It is monthly data chunked (12,ny,nx) in (time,lat,lon) - is that the problem?) If you scroll down to the bottom of each gist notebook you will see that it takes 4x as long using 10workers/20cores on pangeo.pydata.org as on my linux box (where the zarr-ed data is on an internal drive) with 2 threads.

If anyone has words of wisdom on the data reading/ startup delay issues, I am very open to suggestions and would very much like to get this working properly.

Here is my notebook (with output) configured/run on linux box

and here is the same notebook (with output) configured/run on pangeo.pydata.org

mrocklin commented 6 years ago

cc @martindurant , putting this on your backlog if I may. I suspect that you'll enjoy diving into performance issues here.

martindurant commented 6 years ago

I am just heading out for a weekend away, but if no one beats me to it, I can look early next week. My immediate thoughts are, that GCS file-listing and reading metadata must still be done from the client process, and there may be many of these - that's the most likely slow-down in the initial stage.

# ~time to list all files
%time data_fs.du(base_path, True, True)
CPU times: user 2.33 s, sys: 171 ms, total: 2.51 s
Wall time: 25.5 s
12336380536

# size in GB
_ / 2**30
11.489149682223797

# number of files
len(data_fs.walk(base_path))
23832

Can you estimate the per-node data throughput, perhaps by doing a trivial operation such as sum(), versus your local disc? Network data rates >100MB/s are typical, but your disc may be orders of magnitude faster. There may be an opportunity to eat the cost just once if the data can be persisted.

naomi-henderson commented 6 years ago

Thanks for looking at this. I will be gone for the next week, but will do some simple benchmarking of GCS vs my local disc when I return. Thanks for showing how to get the time to list all files and get the number of files - very useful!

mrocklin commented 6 years ago

I'm curious about the practice of listing files from GCS. Are there ways that we might be able to get around this? I would expect this to become an increasingly large issue as our datasets increase in size. I've heard from others that you should avoid listing buckets generally.

On Fri, Jun 22, 2018 at 1:42 PM, Naomi Henderson notifications@github.com wrote:

Thanks for looking at this. I will be gone for the next week, but will do some simple benchmarking of GCS vs my local disc when I return. Thanks for showing how to get the time to list all files and get the number of files - very useful!

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/309#issuecomment-399522680, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszH-ZBIV7LzVHjtF2axC6pN2v-WByks5t_SyXgaJpZM4UlQjI .

martindurant commented 6 years ago

Quite some time ago, now, we realised that it simply took far too long to list all the files in a bucket for data-heavy buckets, and we moved to per-directory listing. This means, though, a new call for every directory, and many calls for a deeply nested set of directories, such as zarr CDF structures seem to have. The following is a set of exists calls seen for accessing one of the standard pangeo datastets. There are repeats in the list, and many calls here are for free, since listings are cached; however, apparent non-existent directories are also tried, the full tree is interrogated up-front, and every single .zarray and .zgroup are opened and read, all small files (often < 1kB) but requiring a new HTTP call every time, and all these happen in serial

exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zattrs/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/elevation/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/elevation/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lat/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lat/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lon/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lon/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/mask/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/mask/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/pcp/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/pcp/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_max/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_max/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_mean/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_mean/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_min/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_min/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_range/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/t_range/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lat/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/lon/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/time/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_ipython_canary_method_should_not_exist_/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_ipython_canary_method_should_not_exist_/.zgroup',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zgroup',), kwargs={})

(note that I have no idea what the _ipython_canary.. or _repr_mimebundle_ are, they do not appear in any file listing as far as I can tell)

All of this aside, the general rule is still to have data chunks which are large enough that the download outweighs overheads from filesystem operations: for newmann there is 120GB in 1930 files, for CMIP5-ts 11GB in 23800 files

martindurant commented 6 years ago

@alimanfoo, the zarr storage of cdf-like nested group data allows access at any level in the hierarchy, which is very convenient, but I wonder is there any possibility of consolidating the group and attr files such that so many list and open operations would not be required at metadata inference time?

rabernat commented 6 years ago

... I wonder is there any possibility of consolidating the group and attr files such that so many list and open operations would not be required at metadata inference time?

👍 to this idea

for newmann there is 120GB in 1930 files, for CMIP5-ts 11GB in 23800 files

@naomi-henderson: how were the cmip5 files chunked? Would it make sense to use larger chunks? Can we see the repr of one of the datasets?

martindurant commented 6 years ago

cmip5 seems to contain many datasets in directories, here is one, pangeo-data/CMIP5-ts/CanESM2/rcp45, picked at random

Dimensions:  (lat: 64, lon: 128, time: 3540)
Coordinates:
  * lat      (lat) float64 -87.86 -85.1 -82.31 -79.53 -76.74 -73.95 -71.16 ...
  * lon      (lon) float64 0.0 2.812 5.625 8.438 11.25 14.06 16.88 19.69 ...
  * time     (time) float64 552.5 553.5 554.5 555.5 556.5 557.5 558.5 559.5 ...
Data variables:
    ts       (time, lat, lon) float32 dask.array<shape=(3540, 64, 128), chunksize=(12, 64, 128)>
Attributes:
    start_date:  2006-01-16T12:00:00.000000000

and here is newmann

Dimensions:    (ensemble: 9, lat: 224, lon: 464, time: 12054)
Coordinates:
  * lat        (lat) float64 25.06 25.19 25.31 25.44 25.56 25.69 25.81 25.94 ...
  * lon        (lon) float64 -124.9 -124.8 -124.7 -124.6 -124.4 -124.3 ...
  * time       (time) datetime64[ns] 1980-01-01 1980-01-02 1980-01-03 ...
Dimensions without coordinates: ensemble
Data variables:
    elevation  (ensemble, lat, lon) float64 dask.array<shape=(9, 224, 464), chunksize=(1, 224, 464)>
    mask       (ensemble, lat, lon) int32 dask.array<shape=(9, 224, 464), chunksize=(1, 224, 464)>
    pcp        (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_max      (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_mean     (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_min      (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
    t_range    (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 287, 224, 464)>
Attributes:
    _ARRAY_DIMENSIONS:         {'ensemble': 9, 'lat': 224, 'lon': 464, 'time'...
    history:                   Version 1.0 of ensemble dataset, created Decem...
    institution:               National Center for Atmospheric Research (NCAR...
    nco_openmp_thread_number:  1
    references:                Newman et al. 2015: Gridded Ensemble Precipita...
    source:                    Generated using version 1.0 of CONUS ensemble ...
    title:                     CONUS daily 12-km gridded ensemble precipitati...
naomi-henderson commented 6 years ago

I chunked the cmip5 data by 12 months. I could certainly use larger chunks, say 20 year, but the run lengths for each of the models can be different and not all chunks would be equal. Note that this is a small subset of the available data for even this simple 2 dimensional (lon,lat) variable and I have only used one ensemble member per model. Ensemble members are run for different lengths of time, so cannot be combined as in the Newmann dataset. Even if we increase the chunks size by a factor of 20 (or no chunking at all - in which case we might as well stick with netcdf, no?) we still have more files per Gbyte of data.

The 3 dimensional data (lon, lat and pressure level) will be better, but if we really want to be able to fly through heterogeneous cmip type data (which can't be open_mfdataset-combined), we need to be able to deal with these issues.

Would it be useful for me to rechunk the data, upload and try again?

alimanfoo commented 6 years ago

Re performance I can suggest a couple of possible avenues to explore, not mutually exclusive.

The first obvious suggestion is to investigate whether it is strictly necessary to locate and read all metadata files up front. If any of that could be delayed and then done within some parallel work then some wall time could be saved.

A second suggestion is to explore ways of making use of the fact that Zarr can use separate stores for data and metadata (that was @mrocklin's idea). All Zarr API calls support both a store and a chunk_store argument. If only store is provided then the same store is used for both metadata (.zarray, .zgroup, .zattrs) and data (chunks). However if chunk_store is provided then that is used for data and store is used for metadata.

There are a number of possible ways this could be leveraged. For example, data and metadata could both be stored on GCS as usual but in separate buckets, or separate directory paths within a bucket. This could reduce the cost of directory or bucket listing when locating metadata files. But you could do other things too. E.g., you could store the chunks on GCS as usual (one chunk per object), but invent some way of baking all metadata into a single GCS object, then write a custom Mapping to expose that as a store to Zarr, with the idea that it is read once via a single HTTP request and then cached by the client process. You could also use something other than GCS to store metadata, but leave chunks on GCS. Lots of possibilities, all you need to do is implement a Mapping interface.

Also final quick thought, this may not help at all as it sounds like metadata is already being cached, but there is a LRUStoreCache in Zarr which can be used as a wrapper around either metadata store or data store (or both).

Happy to discuss further.

mrocklin commented 6 years ago

@alimanfoo would it be possible to optionally collect all metadata from everywhere within a Zarr dataset and bring it into a single file at the top level? This would have to be invalidated on any write operation that changed metadata, but in the write-once-read-many case it might be helpful.

alimanfoo commented 6 years ago

@mrocklin yes, you could do something like this now via the store/chunk_store API without any code or spec changes in Zarr. Assuming data is prepared on a local file system first then uploaded to GCS, steps could be something like...

(0) Build the Zarr files and upload to GCS as usual (already done).

(1) Decide on how to pack metadata from multiple files into a single file. Lots of ways you could do this, e.g., you could just build a big JSON document mapping the relative file paths to file contents, e.g.:

{
    ".zgroup": {
        ...
    },
    "elevation/.zarray": {
        ...
    },
    "elevation/.zattrs": {
        ...
    },
    ...
}

(2) Write a script to crawl the directory structure for metadata files and build the combined metadata file.

(3) Upload the combined metadata file to GCS, put it at the root of the hierarchy under a resource name like ".zmeta" (or put it wherever you like).

(4) Implement a Mapping interface (e.g., called something like CombinedMetaStore) that can read keys out of a combined metadata file. Importantly this would read the whole combined file once and cache it.

Then you could open a Zarr hierarchy with something like:

gcsmap = gcsfs.GCSMap('pangeo-data/newman-met-ensemble')
meta_store = CombinedMetaStore(gcsmap, key='.zmeta')
root = zarr.Group(store=meta_store, chunk_store=gcsmap)

For access via xarray the xarray API may have to be changed to pass through both store and chunk_store args.

N.B., this assumes data are read-only once in GCS, so there is nothing like invalidation of the combined metadata file. But for this use case I don't think that would be necessary.

Happy to elaborate if this isn't making sense yet.

Btw also happy to discuss spec changes and/or features to support this kind of thing natively in Zarr, but trying to exhaust all possibilities for hacking this without spec or code changes first.

martindurant commented 6 years ago

From the pangeo perspective, reading such a combined metadata file would presumably require (subtle) changes in xarray code.

alimanfoo commented 6 years ago

On the xarray side in theory all you'd have to do is allow a user to provide both store and chunk_store args and pass those through to zarr, although that's said without looking at the code.

One other thing I noticed, some of the exists calls look odd, e.g., can't think why the following would be needed:

exists(args=('pangeo-data/newman-met-ensemble/.zattrs/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/_repr_mimebundle_/.zgroup',), kwargs={})

...or why an exists would get called on both array and group at the same path, e.g.:

exists(args=('pangeo-data/newman-met-ensemble/.zarray',), kwargs={})
exists(args=('pangeo-data/newman-met-ensemble/.zgroup',), kwargs={})
martindurant commented 6 years ago

Yeah, I can't say where those off exists calls are coming from, but of course only files that are found are loaded, which can still be many small files. I suppose if .zarray doesn't exist, the code next tries .zgroup.

It should be pretty simple to make a function to collect all the group/array/attrs into a single JSON blob and store it in the top-level directory, and have xarray look for it, and use it as the chunks_store if available. The function would only we called rarely by users who know the data set is to be read-only. The function itself could live in xarray or zarr itself, but the xarray code would need to change to, at the minimum, allow chunks_store or (better) look for the aggregated JSON file.

In the case in hand, where there are many small datasets not in a single group structure, all this may not make a massive difference.

martindurant commented 6 years ago

@naomi-henderson , there are two concrete things here that can be done: 1) implement zarr metadata consolidation, 2) rechunk your data into larger pieces. I intend to work on 1), but 2) can help you more immediately, even though the total file sizes are not very big, as you pointed out. Is there any other way in which I can help you to be more productive?

alimanfoo commented 6 years ago

On Tue, 26 Jun 2018 at 17:42, Martin Durant notifications@github.com wrote:

Yeah, I can't say where those off exists calls are coming from, but of course only files that are found are loaded, which can still be many small files. I suppose if .zarray doesn't exist, the code next tries .zgroup.

It may be that zarr is doing more exists checks than it needs to, and/or more loading of metadata files than it needs to. If at any point you think it's worth digging into this, happy to look into it too. Would just need to capture the zarr API calls being made by user code and the corresponding list of calls being made at the storage layer.

It should be pretty simple to make a function to collect all the group/array/attrs into a single JSON blob and store it in the top-level directory, and have xarray look for it, and use it as the chunks_store if available. The function would only we called rarely by users who know the data set is to be read-only. The function itself could live in xarray or zarr itself, but the xarray code would need to change to, at the minimum, allow chunks_store or (better) look for the aggregated JSON file.

FWIW initially I'd be inclined to just expose the chunk_store arg in xarray and leave everything else to user code for the moment, i.e., not yet fix on any convention for where consolidated metadata file is stored or what format the file uses. Would be good to have some flexibility to explore options.

In the case in hand, where there are many small datasets not in a single group structure, all this may not make a massive difference.

Yep, fair enough.

naomi-henderson commented 6 years ago

@martindurant I have rechunked the data for 120 months instead of 12, with better performance. Well, at least so that the openzarr step takes about the same as the first reduction (computing 12 month averages).

The file system listing to find the models is negligible compared to the openzarr step.

Here are typical results, but note that the openzarr step for the smaller chunked case is very unpredictable, sometimes pausing for > 1 minute between reads of one model and the next (different models each time).

on pangeo.pydata.org with the default 20 workers:

### historical runs, chunk = 120 months

finding models:     
CPU times: user 194 ms, sys: 21 ms, total: 215 ms
Wall time: 1.72 s

openzarr:
CPU times: user 9.89 s, sys: 900 ms, total: 10.8 s
Wall time: 1min 29s

calculate annual means:
CPU times: user 39.4 s, sys: 6.04 s, total: 45.4 s
Wall time: 1min 26s

regridding:
CPU times: user 1.85 s, sys: 802 ms, total: 2.65 s
Wall time: 2.55 s

concatenating:
CPU times: user 254 ms, sys: 438 ms, total: 692 ms
Wall time: 654 ms

TOTAL: 
CPU times: user 51.6 s, sys: 8.21 s, total: 59.8 s
Wall time: 3min
### historical runs, chunk = 12 months

finding models:
CPU times: user 200 ms, sys: 18 ms, total: 218 ms
Wall time: 2.5 s

openzarr:
CPU times: user 10.8 s, sys: 866 ms, total: 11.7 s
Wall time: 1min 51s  (Varies from 1min 35s to 3min 7s, sometimes stopping for >1min)

annual means:
CPU times: user 1min 19s, sys: 10 s, total: 1min 29s
Wall time: 2min 41s

regridding:
CPU times: user 1.85 s, sys: 884 ms, total: 2.73 s
Wall time: 2.61 s

concatenating:
CPU times: user 258 ms, sys: 426 ms, total: 684 ms
Wall time: 641 ms

TOTAL:
CPU times: user 1min 32s, sys: 12.2 s, total: 1min 44s
Wall time: 4min 38s

For comparison (perhaps unfairly), on my linux machine (32 threads available):

### historical runs, chunk = 12 months

finding models:
CPU times: user 4.56 ms, sys: 3.95 ms, total: 8.51 ms
Wall time: 7.98 ms

openzarr:
CPU times: user 301 ms, sys: 34 ms, total: 335 ms
Wall time: 330 ms

annual means:
CPU times: user 5min 1s, sys: 11min 2s, total: 16min 4s
Wall time: 40.6 s

regridding:
CPU times: user 1.37 s, sys: 647 ms, total: 2.02 s
Wall time: 2 s

concatenating:
CPU times: user 439 ms, sys: 1.26 s, total: 1.7 s
Wall time: 170 ms

TOTAL:
CPU times: user 5min 3s, sys: 11min 4s, total: 16min 8s
Wall time: 43.1 s
naomi-henderson commented 6 years ago

btw, why do the permissions on the .zgroup and .zattrs files, created by to_zarr, need to to be so restrictive? would it cause trouble if I allowed group/other read access?

zarr5/CMIP5-ts/ACCESS1-0/rcp85:
drwxrwxr-x 6 naomi naomi 4096 Jun 26 20:55 .
drwxrwxr-x 5 naomi naomi 4096 Jun 26 20:55 ..
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 lat
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 lon
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 time
drwxrwxr-x 2 naomi naomi 4096 Jun 26 20:55 ts
-rw------- 1 naomi naomi   53 Jun 26 20:55 .zattrs
-rw------- 1 naomi naomi   24 Jun 26 20:55 .zgroup
martindurant commented 6 years ago

why do the permissions on the .zgroup and .zattrs files need to to be so restrictive?

That looks pretty odd, and I'm sure making them broader is fine. Indeed, actually there is an argument in general to make them all read-only for typical write-once datasets.

martindurant commented 6 years ago

I'd be inclined to just expose the chunk_store arg in xarray

@alimanfoo , that would of course be simple and should definitely be done, but in this case it would require at least a few lines of extra code to fetch the metadata upfront.

So any extra parameter to xarray's zarr functions could perhaps specify an arbitrary mutable-mapping (i.e., pass the extra parameter), be a JSON key within the original mapping, or the previous behaviour. Now it sounds like shoehorning too much into xarray, which is why I thought to make the POC in zarr first. Could these options (and the function to consolidate the metadata) live elsewhere, perhaps in intake-xarray?

mrocklin commented 6 years ago

Personally I would like to see this functionality be within either Zarr or XArray and not depend on a third package.

Ideally this is as low in the stack as makes sense.

On Tue, Jun 26, 2018 at 11:12 PM, Martin Durant notifications@github.com wrote:

I'd be inclined to just expose the chunk_store arg in xarray

@alimanfoo https://github.com/alimanfoo , that would of course be simple and should definitely be done, but in this case it would require at least a few lines of extra code to fetch the metadata upfront.

So any extra parameter to xarray's zarr functions could perhaps specify an arbitrary mutable-mapping (i.e., pass the extra parameter), be a JSON key within the original mapping, or the previous behaviour. Now it sounds like shoehorning too much into xarray, which is why I thought to make the POC in zarr first. Could these options (and the function to consolidate the metadata) live elsewhere, perhaps in intake-xarray?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/309#issuecomment-400529370, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszE1tg3JBxXN23cO0avU2jmOIbGMzks5uAvgigaJpZM4UlQjI .

mrocklin commented 6 years ago

To expand on that a bit. If we want Zarr to be used beyond just XArray then I think that we need to establish a convention for storing NetCDF-like data efficiently in Zarr. This convention is separate from any implementation in XArray or other package like intake. Ideally that convention is written up somewhere that other people in other language communities could implement independently from our technology stack.

I think that it might be reasonable to include the centralization of metadata storage in that convention.

Now, for our technology stack some package does need to implement that convention. I suspect that for us that is either Zarr or XArray. If XArray is the only library that's likely to actually use this then sure, put it there. But if other libraries like Iris might also want to use Zarr in the future then maybe it makes sense to centralize some of this logic in Zarr? Unfortunately I don't know enough about the organzation of the Zarr and XArray-Zarr code to speak with any more detail than the generalities above.

martindurant commented 6 years ago

One example of the metadata speedup that should be expected if consolidating zarr stuff into a single file, here run locally against one of @naomi-henderson's datasets on remote GCS:

# original
In [3]: %%time
   ...: gcs = gcsfs.GCSFileSystem(token='anon')
   ...: mapper = gcsfs.GCSMap(gcs=gcs, root='mdtemp/CMIP5-ts/CanESM2/rcp45/')
   ...: d = xr.open_zarr(mapper)
   ...:
CPU times: user 221 ms, sys: 36.3 ms, total: 258 ms
Wall time: 5.71 s

# with consolidation and small hack in zarr to use it
In [3]: %%time  
   ...: gcs = gcsfs.GCSFileSystem(token='anon')
   ...: mapper = gcsfs.GCSMap(gcs=gcs, root='mdtemp/CMIP5-ts/CanESM2/rcp45/')
   ...: d = xr.open_zarr(mapper)
   ...:
CPU times: user 112 ms, sys: 28.1 ms, total: 140 ms
Wall time: 1.62 s
martindurant commented 6 years ago

For the interested, the extra code in zarr looks like this

--- a/zarr/hierarchy.py
+++ b/zarr/hierarchy.py
@@ -92,6 +92,14 @@ class Group(MutableMapping):
     def __init__(self, store, path=None, read_only=False, chunk_store=None,
                  cache_attrs=True, synchronizer=None):

+        try:
+            import json
+            metadata = json.loads(store['.zmetadata'])
+            meta_store = {k: v.encode() for k, v in metadata.items()}
+            chunk_store, store = store, meta_store
+        except (KeyError, ValueError, json.JSONDecodeError):
+            pass
+

(perhaps this should be in https://github.com/zarr-developers/zarr/pull/268 , but even if it ends up in zarr, this is probably not the final version)

alimanfoo commented 6 years ago

On Wed, 27 Jun 2018 at 14:14, Matthew Rocklin notifications@github.com wrote:

To expand on that a bit. If we want Zarr to be used beyond just XArray then I think that we need to establish a convention for storing NetCDF-like data efficiently in Zarr. This convention is separate from any implementation in XArray or other package like intake. Ideally that convention is written up somewhere that other people in other language communities could implement independently from our technology stack.

I think that it might be reasonable to include the centralization of metadata storage in that convention.

I agree. My comments were only to suggest that we don't fix too quickly on what that convention is, that we allow a little time for experimentation, and so for now we only put in whatever minimal changes are required to make it easy to experiment.

Now, for our technology stack some package does need to implement that convention. I suspect that for us that is either Zarr or XArray. If XArray is the only library that's likely to actually use this then sure, put it there. But if other libraries like Iris might also want to use Zarr in the future then maybe it makes sense to centralize some of this logic in Zarr? Unfortunately I don't know enough about the organzation of the Zarr and XArray-Zarr code to speak with any more detail than the generalities above.

I think this will be useful beyond xarray so would be happy to see something go into zarr ultimately.

alimanfoo commented 6 years ago

@martindurant cool. Btw have been meaning to ask, does anyone know what the latency is for making requests to GCS from within Google cloud (same region)? I.e., what's the fixed cost of retrieving an object, independent of the size of the object? Would be useful to know (in addition to the bandwidth) to help tune chunk sizes.

martindurant commented 6 years ago

@alimanfoo , minimum 100ms, maybe double that even for trivial tasks. Time is taken to set up SSL connections (should be rare, sessions should be reused), time for google to actually perform the action such as file-list, and transferring of any data. Using 'anon' auth may be marginally faster, I think because GCS doesn't need to do verification.

alimanfoo commented 6 years ago

@martindurant thanks. And bandwidth from GCS to a compute node is around 100 Mb/s?

Actually just found this from google's HPC docs:

Leverage the cost and bandwidth of Cloud Storage instead of running your own parallel file system. Cloud Storage provides strong consistency and scalable parallel performance with low overall cost. While the first-byte latency is high at roughly 100 ms, applications that can leverage Cloud Storage instead of running a parallel file server on Compute Engine are more cost effective. The available bandwidth between Cloud Storage and the compute nodes is sufficient for many applications, some customers have reported sustained aggregate bandwidth of over 23 GB/s.

They say 23 GB/s is aggregate bandwidth, I think that means between GCS and a large cluster. Struggling to find official docs on bandwidth between GCS and a single node.

martindurant commented 6 years ago

100MB/s is what we used to say on AWS/S3, and anecdotally GCS was a bit better, but same order-of-magnitude.

alimanfoo commented 6 years ago

@martindurant thanks. So that suggests a rule of thumb, chunk shape should ideally be such that each chunk is at least 10Mb compressed size, smaller than that and read time for a single array will be dominated by latency. That is separate from the metadata issue of course, consolidating metadata will always help.

rabernat commented 6 years ago

FYI, I have been aiming to have my chunks around 20MB. This size matches well with the use cases we have.

Larger chunks would be advantageous for several reasons, particularly in reducing the number of dask tasks. However, the tradeoff is that random small reads (i.e. browsing) become more expensive and memory usage goes up.

alimanfoo commented 6 years ago

@rabernat that sounds very reasonable.

I'm sure everyone knows this very well already but just to reiterate it's always worth checking the compressed size of chunks as compression ratio can vary a lot between datasets. E.g., we have some datasets where you get a 70x compression ratio, others where ratio is only 2x or 3x.

martindurant commented 6 years ago

^ and, of course, it is generally more worthwhile attempting higher compression ratios for slower bandwidth storage - but that only affects the download time, not the connection overhead. I wonder if we should have a utility function that can estimate the disc size of some array for a few common compression configs and/or encodings.

alimanfoo commented 6 years ago

FWIW for maximum speed I use numcodecs.Blosc(cname='lz4', clevel=1, shuffle=0) but that tends to be optimal only when you have very high I/O (e.g., to a local SSD). For slower I/O settings I tend to use numcodecs.Blosc(cname='zstd', clevel=1, shuffle=...) where I will try out all three shuffle options (0 = no shuffle, 1 = byte shuffle, 2 = bit shuffle) on the data to see if they make a difference to compression ratio. For floating point data, shuffle probably won't do much unless combined with some kind of filter to fix the precision of the data, e.g., numcodecs.FixedScaleOffset or numcodecs.Quantize or I believe xarray has a built-in scale-offset. In general I've tended to benchmark a number of options for the datasets I most often work with.

If the size of the network pipe on google cloud is ~100 Mb/s then any compressor that decompresses faster than that is probably worth a try. The plot below was made on a particular type of genomic data and so may not translate for geo data, but may give a general sense of relative performance for different compressors (numbers next to each bar like "61.1X" are the compression ratios):

image

martindurant commented 6 years ago

@naomi-henderson , is it useful for me to replicate your data to another GCS location and make consolidated metadata files, and point you to a version of zarr that would make use of them? That might solve your immediate problem and provide good motivation for the consolidation PR over on zarr. I can do this too, but I don't know when I would get to it. Note that I do not expect metadata consolidation to have much of an effect when working with local data.

naomi-henderson commented 6 years ago

yes @martindurant , I think that would be very helpful - I am curious if it is the repeated metadata or the latency which is the main issue here.

What do you need in order to consolidate the metadata, the original netcdf or the chunked zarr files?

By the way, I am currently using 120 month chunked data (in pangeo-data/CMIP5-ts/120chunk/), but since the resolution varies from model to model, this does not guarantee a fixed chunk size. Note that I could make the chunk a more uniform size (e.g., @rabernat 's 20M) if I allow the number of months to vary from model to model based on the resolution of the data. Here are the chunk sizes if I use 120 months per chunk:

8.0M    ACCESS1-0/historical/ts/0.0.0
8.0M    ACCESS1-3/historical/ts/0.0.0
2.4M    bcc-csm1-1/historical/ts/0.0.0
15M         bcc-csm1-1-m/historical/ts/0.0.0
2.5M    BNU-ESM/historical/ts/0.0.0
2.5M    CanCM4/historical/ts/0.0.0
2.5M    CanESM2/historical/ts/0.0.0
16M         CCSM4/historical/ts/0.0.0
16M         CESM1-BGC/historical/ts/0.0.0
4.1M    CESM1-CAM5-1-FV2/historical/ts/0.0.0
16M         CESM1-CAM5/historical/ts/0.0.0
16M         CESM1-FASTCHEM/historical/ts/0.0.0
4.1M    CESM1-WACCM/historical/ts/0.0.0
1.3M    CMCC-CESM/historical/ts/0.0.0
28M      CMCC-CM/historical/ts/0.0.0
4.7M    CMCC-CMS/historical/ts/0.0.0
9.3M    CNRM-CM5-2/historical/ts/0.0.0
5.4M    CSIRO-Mk3-6-0/historical/ts/0.0.0
1.2M    CSIRO-Mk3L-1-2/historical/ts/0.0.0
2.3M    FGOALS-g2/historical/ts/0.0.0
4.1M    FGOALS-s2/historical/ts/0.0.0
2.4M    FIO-ESM/historical/ts/0.0.0
3.8M    GFDL-CM2p1/historical/ts/0.0.0
3.8M    GFDL-CM3/historical/ts/0.0.0
3.9M    GFDL-ESM2G/historical/ts/0.0.0
3.8M    GFDL-ESM2M/historical/ts/0.0.0
3.8M    GISS-E2-H-CC/historical/ts/0.0.0
3.8M    GISS-E2-H/historical/ts/0.0.0
3.8M    GISS-E2-R-CC/historical/ts/0.0.0
3.8M    GISS-E2-R/historical/ts/0.0.0
2.0M    HadCM3/historical/ts/0.0.0
8.0M    HadGEM2-AO/historical/ts/0.0.0
7.3M    HadGEM2-CC/historical/ts/0.0.0
7.3M    HadGEM2-ES/historical/ts/0.0.0
6.3M    inmcm4/historical/ts/0.0.0
2.8M    IPSL-CM5A-LR/historical/ts/0.0.0
6.0M    IPSL-CM5A-MR/historical/ts/0.0.0
2.8M    IPSL-CM5B-LR/historical/ts/0.0.0
53M         MIROC4h/historical/ts/0.0.0
9.2M    MIROC5/historical/ts/0.0.0
2.4M    MIROC-ESM-CHEM/historical/ts/0.0.0
2.4M    MIROC-ESM/historical/ts/0.0.0
4.7M    MPI-ESM-LR/historical/ts/0.0.0
4.7M    MPI-ESM-MR/historical/ts/0.0.0
4.7M    MPI-ESM-P/historical/ts/0.0.0
15M         MRI-CGCM3/historical/ts/0.0.0
15M         MRI-ESM1/historical/ts/0.0.0
4.1M    NorESM1-ME/historical/ts/0.0.0
4.1M    NorESM1-M/historical/ts/0.0.0
martindurant commented 6 years ago

@naomi-henderson , you could install my zarr branch pip install git+https://github.com/martindurant/zarr.git@consolidate_metadata (do this in a non-production environment!)

The function xarr.convenience.consolidate_metadata can be run on an existing mapping containing a zarr dataset and the "hack" in the Group.__init__ will look for these consolidated metadata keys and use them if they exist. That's how I did my timings, above, for the case of one data-set.

Of course, it would be best not to run the consolidation on any original data - only on stuff no one else will be using, or that can be easily re-made, should the process somehow break what is already there.

naomi-henderson commented 6 years ago

thanks @martindurant ! I think I need to down-rev my xarray? Your pip install worked, but then to_zarr throws the error:

NotImplementedError: Zarr version 2.2 or greater is required by xarray. See zarr installation http://zarr.readthedocs.io/en/stable/#installation

What version of xarray are you using? I am at '0.10.7'

martindurant commented 6 years ago

That zarr appears to have version '2.2.1.dev5+dirty', maybe xarray can't parse that... my xarray is 0.10.3.

naomi-henderson commented 6 years ago

ah, your zarr thinks it is 0.4.1.dev843 ?

martindurant commented 6 years ago

Huh, I have no idea where 0.4.1.dev843 comes from. You could instead grab the repo


> conda create -n testenv python zarr xarray gcsfs ipython -c conda-forge
> conda activate testenv
> git clone https://github.com/martindurant/zarr
> cd zarr
> git checkout consolidate_metadata
> pyhon setup.py install
> ipython
In [ ]: import gcsfs
   ...: import xarray as xr

In [ ]: %%time
   ...: gcs = gcsfs.GCSFileSystem(token='anon')
   ...: mapper = gcsfs.GCSMap(gcs=gcs, root='mdtemp/CMIP5-ts/CanESM2/rcp45/')
   ...: d = xr.open_zarr(mapper)
mrocklin commented 6 years ago

@martindurant I suspect that if you push tags to your fork then this problem will go away. Sometimes with versioneer it bases the version on the most recent tag. If you don't have recent tags in your fork then your version can appear to be quite old.

On Mon, Jul 2, 2018 at 5:10 PM, Martin Durant notifications@github.com wrote:

Huh, I have no idea where 0.4.1.dev843 comes from. You could instead grab the repo

conda create -n testenv python zarr xarray gcsfs ipython -c conda-forge conda activate testenv git clone https://github.com/martindurant/zarr cd zarr git checkout consolidate_metadata pyhon setup.py install ipython In [ ]: import gcsfs ...: import xarray as xr

In [ ]: %%time ...: gcs = gcsfs.GCSFileSystem(token='anon') ...: mapper = gcsfs.GCSMap(gcs=gcs, root='mdtemp/CMIP5-ts/CanESM2/rcp45/') ...: d = xr.open_zarr(mapper)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/309#issuecomment-401937498, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszEjeOT3oDAzN1Ip46iuVQ6n_aA10ks5uCoxLgaJpZM4UlQjI .