fsspec / kerchunk

Cloud-friendly access to archival data
https://fsspec.github.io/kerchunk/
MIT License
302 stars 78 forks source link

Issue reading zarr files with Dask distributed #153

Closed dmcg closed 2 years ago

dmcg commented 2 years ago

I'm not at all sure that this is kerchunk's problem to be honest, but would welcome a pointer.

I can kerchunk UK Met Office netCDF files and read them into xarray as zarr, but if I try to read them with Dask workers they all read until they have run out of memory and crash.

import dask
from dask.distributed import Client
client = Client()

import fsspec
import xarray

def open_fsspec_zarr(json_path):
    with open(json_path) as f:
        mapper = fsspec.get_mapper(
                "reference://", 
                fo=json.load(f),
                remote_protocol="s3",
                remote_options={"anon": False})
        return xarray.open_dataset(mapper, engine='zarr', consolidated=False)

If we don't chunk the dataset, it works

dataset = open_fsspec_zarr('/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.json')

%%time
slice = dataset.isel(height=5, realization=1)
slice

CPU times: user 659 µs, sys: 0 ns, total: 659 µs
Wall time: 636 µs

<xarray.Dataset>
Dimensions:                  (latitude: 960, longitude: 1280, bnds: 2)
Coordinates:
    forecast_period          timedelta64[ns] 1 days 18:00:00
    forecast_reference_time  datetime64[ns] 2021-11-07T06:00:00
    height                   float32 75.0
  * latitude                 (latitude) float32 -89.91 -89.72 ... 89.72 89.91
  * longitude                (longitude) float32 -179.9 -179.6 ... 179.6 179.9
    realization              float64 18.0
    time                     datetime64[ns] 2021-11-09
Dimensions without coordinates: bnds
Data variables:
    air_pressure             (latitude, longitude) float32 ...
    latitude_bnds            (latitude, bnds) float32 -90.0 -89.81 ... 90.0
    latitude_longitude       float64 nan
    longitude_bnds           (longitude, bnds) float32 -180.0 -179.7 ... 180.0
Attributes:
    Conventions:                  CF-1.7
    history:                      2021-11-07T10:27:38Z: StaGE Decoupler
    institution:                  Met Office
    least_significant_digit:      1
    mosg__forecast_run_duration:  PT198H
    mosg__grid_domain:            global
    mosg__grid_type:              standard
    mosg__grid_version:           1.6.0
    mosg__model_configuration:    gl_ens
    source:                       Met Office Unified Model
    title:                        MOGREPS-G Model Forecast on Global 20 km St...
    um_version:                   11.5

%time
fetched_slice = slice.to_array()[0,...,0]
plt.figure(figsize=(6, 6))
plt.imshow(fetched_slice, origin='lower')    

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.72 µs

<matplotlib.image.AxesImage at 0x7f4608324550>

If we chunk it, it is run on the Dask workers, but runs out of memory on the workers and crashes the whole box.

chunked_dataset = dataset.chunk('auto')

%%time
slice = chunked_dataset.isel(height=5, realization=1)
slice

%time
fetched_slice = slice.to_array()[0,...,0]
plt.figure(figsize=(6, 6))
plt.imshow(fetched_slice, origin='lower')    

My assumption is that the reference filesystem is not being properly communicated to the workers as a task graph?

rsignell-usgs commented 2 years ago

@dmcg, I'm guessing the memory issues you are having have to do with rechunking. When you do chunk('auto') you are actually rechunking the dataset, which is a challenging operation for Dask, and quite often blows memory. If you truly want to rechunk the data, consider using rechunker, which addresses the memory issue.

For virtual Zarr datasets created with kerchunk, however, we usually don't want to rechunk, but we do want to use the native chunking of the original files. You can achieve this by just adding chunks={} when you open the dataset.

So try:

def open_fsspec_zarr(json_path):
    with open(json_path) as f:
        mapper = fsspec.get_mapper(
                "reference://", 
                fo=json.load(f),
                remote_protocol="s3",
                remote_options={"anon": False})
        return xarray.open_dataset(mapper, engine='zarr', chunks={}, backend_kwargs=dict(consolidated=False))

That should definitely not blow memory!

dmcg commented 2 years ago

Ah, that makes a lot of sense, thanks...

... but leads to another issue, sigh.

The source netCDF files don't have any chunking (or rather, have one big chunk). When I open those locally with xarray.open_dataset('/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk('auto') I get 5x4 chunks of lat, long, each 18 realisations and 33 heights.

These are successfully processed by workers without using too much memory. I assume each has to read the whole source chunk (or at least as far as is required to find its processing chunk), but can discard the bytes it doesn't need.

Maybe the zarr reading is not discarding the surplus data bytes but hanging onto them - perhaps this is s3fs?

martindurant commented 2 years ago

zarr and s3fs should be pretty good at reading exactly the chunks of bytes it needs and then discarding. While processing, you would expect to see memory spikes as the bytes are uncompressed and converted into internal representation, plus any temporaries that the task requires.

rsignell-usgs commented 2 years ago

@dmcg, can you try using ncdump to report back the chunking info?
Xarray doesn't do a good job of figuring this out. I'm just wondering if there really is some chunking we aren't seeing. Can you report back what this produces?

ncdump -hs /data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc
dmcg commented 2 years ago

I get

netcdf \000490262cdd067721a34112963bcaa2b44860ab {
dimensions:
        realization = 18 ;
        height = 33 ;
        latitude = 960 ;
        longitude = 1280 ;
        bnds = 2 ;
variables:
        float air_pressure(realization, height, latitude, longitude) ;
                air_pressure:standard_name = "air_pressure" ;
                air_pressure:units = "Pa" ;
                air_pressure:grid_mapping = "latitude_longitude" ;
                air_pressure:coordinates = "forecast_period forecast_reference_time time" ;
                air_pressure:_Storage = "contiguous" ;
        int latitude_longitude ;
                latitude_longitude:grid_mapping_name = "latitude_longitude" ;
                latitude_longitude:longitude_of_prime_meridian = 0. ;
                latitude_longitude:earth_radius = 6371229. ;
                latitude_longitude:_Endianness = "little" ;
        int realization(realization) ;
                realization:units = "1" ;
                realization:standard_name = "realization" ;
                realization:_Storage = "contiguous" ;
                realization:_Endianness = "little" ;
        float height(height) ;
                height:axis = "Z" ;
                height:units = "m" ;
                height:standard_name = "height" ;
                height:positive = "up" ;
                height:_Storage = "contiguous" ;
        float latitude(latitude) ;
                latitude:axis = "Y" ;
                latitude:bounds = "latitude_bnds" ;
                latitude:units = "degrees_north" ;
                latitude:standard_name = "latitude" ;
                latitude:_Storage = "contiguous" ;
        float latitude_bnds(latitude, bnds) ;
                latitude_bnds:_Storage = "contiguous" ;
        float longitude(longitude) ;
                longitude:axis = "X" ;
                longitude:bounds = "longitude_bnds" ;
                longitude:units = "degrees_east" ;
                longitude:standard_name = "longitude" ;
                longitude:_Storage = "contiguous" ;
        float longitude_bnds(longitude, bnds) ;
                longitude_bnds:_Storage = "contiguous" ;
        int forecast_period ;
                forecast_period:units = "seconds" ;
                forecast_period:standard_name = "forecast_period" ;
                forecast_period:_Endianness = "little" ;
        int64 forecast_reference_time ;
                forecast_reference_time:units = "seconds since 1970-01-01 00:00:00" ;
                forecast_reference_time:standard_name = "forecast_reference_time" ;
                forecast_reference_time:calendar = "gregorian" ;
                forecast_reference_time:_Endianness = "little" ;
        int64 time ;
                time:units = "seconds since 1970-01-01 00:00:00" ;
                time:standard_name = "time" ;
                time:calendar = "gregorian" ;
                time:_Endianness = "little" ;

// global attributes:
                :_NCProperties = "version=2,netcdf=4.8.1,hdf5=1.12.1" ;
                :history = "2021-11-07T10:27:38Z: StaGE Decoupler" ;
                :institution = "Met Office" ;
                :least_significant_digit = 1L ;
                :mosg__forecast_run_duration = "PT198H" ;
                :mosg__grid_domain = "global" ;
                :mosg__grid_type = "standard" ;
                :mosg__grid_version = "1.6.0" ;
                :mosg__model_configuration = "gl_ens" ;
                :source = "Met Office Unified Model" ;
                :title = "MOGREPS-G Model Forecast on Global 20 km Standard Grid" ;
                :um_version = "11.5" ;
                :Conventions = "CF-1.7" ;
                :_Format = "netCDF-4" ;
}
rsignell-usgs commented 2 years ago

@dmcg , darn. Indeed. No chunking.

martindurant commented 2 years ago

Does the above imply no compression? The kerchunk/zarr version would also tell you this, if the info dump isn't expected to show that detail.

dmcg commented 2 years ago

When I run the code with chunks={} but without a Dask client in scope it runs sucessfully in the jupyter server process, but peaks at about 3GiB. I suspect that if I give the workers that amount of memory it might complete distributed, but if so it does suggest that they aren't discarding much of the 2.7GiB of data when they process their chunks?

rsignell-usgs commented 2 years ago

Each chunk is 3GB, so I think you are right - the workers would need at least 3GB.

realization = 18 ;
height = 33 ;
latitude = 960 ;
longitude = 1280 ;
print(realization*height*latitude*longitude*4/1e9)   #chunk of 4byte floats in GB
2.9
dmcg commented 2 years ago

The zarr file suggests no compression

air_pressure/.zarray: "{ "chunks": [ 18, 33, 960, 1280 ], "compressor": null, "dtype": "<f4", "fill_value": 9.969209968386869e+36, "filters": null, "order": "C", "shape": [ 18, 33, 960, 1280 ], "zarr_format": 2 }"

martindurant commented 2 years ago

OK, so we haven't done it before, but in the case of no compression, we are completely free to choose the chunking along the largest axis. I wonder if direct netCDF access i getting this right already?

rsignell-usgs commented 2 years ago

@martindurant ooh! So in other words we could split these files in 18 33 different chunks, or 9 33, or 9 * 11 different chunks, etc, right?

martindurant commented 2 years ago

Yes, totally possible. I already had that in mind for netCDF3, which is not compressed internally and has that awkward "append" dimension. I didn't expect to find uncompressed netCDF4/HDF.

Separately, there is also a idea that zarr should pass on the exact request to the storage layer in the case of no compression or compatible block-wise compression, such that you don't need to load whole chunks. That would need development in zarr, so a larger undertaking but more universally useful.

dmcg commented 2 years ago

I've no idea why there is no compression or chunking in the source files, except maybe that this way gives the maximum flexibility for that sort of trick.

martindurant commented 2 years ago

Not arbitrary chunks: for original [ 18, 33, 960, 1280 ], you could have chunks

dmcg commented 2 years ago

Am I right in thinking though that something in the workers must be holding on to data that they don't require to be processing their current chunk, even discounting the fact that they could just do some sums and set range headers?

dmcg commented 2 years ago

Thanks for your help. When I'm feeling brave I'll try hand-editing the kerchunk .json file to represent the source as multiple chunks.

martindurant commented 2 years ago

Am I right in thinking though that something in the workers must be holding on to data that they don't require to be processing their current chunk, even discounting the fact that they could just do some sums and set range headers?

They need the 3GB of bytes and then an in-memory representation of the data. There is no way to get only part of this as things stand (until you edit the refs file!).

To edit, you will need to change the chunking in the .zarray file, and edit and add keys. So for air_pressure, you would edit key "air_pressure/.zarray"'s chunks: field, and lookgin at the "air_pressure/0.0.0.0" file, you will see a [URL, offset, size] as the value. You will want to make keys like "air_pressure/i.0.0.0" with i from 0 to 17 (let's suppose you choose chunking of [ 1, 33, 960, 1280 ]), with each having offset of i*size compared to the original offset. You can edit just the one variable before trying others.

rsignell-usgs commented 2 years ago

@dmcg if you succeed with the hand editing, please report back! That would be super cool, and a kerchunk first! We have special prizes for that, right @martindurant ? 🙃

dmcg commented 2 years ago

They need the 3GB of bytes and then an in-memory representation of the data. There is no way to get only part of this as things stand (until you edit the refs file!).

This is the bit that I don't understand. Running the same 'algorithm' against the netCDF file

dataset = xarray.open_dataset('/data/metoffice/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk('auto')
dataset

  | Array | Chunk
-- | -- | --
2.72 GiB | 69.61 MiB
(18, 33, 960, 1280) | (18, 33, 192, 160)
41 Tasks | 40 Chunks
float32 | numpy.ndarray

slice = dataset.isel(height=5, realization=1)
fetched_slice = slice.to_array()[0,...,0]

gives very little memory used per worker. My model was that, even if there had been compression, each worker could have read and decompressed the entire file (or usually less), discarding all but the bytes that needed to be assembled to represent their current chunk. With no compression, each could just seek in the file to find the bits of the chunk? Is there any reason why zarr cannot assemble the xarray chunks in the same way?

rsignell-usgs commented 2 years ago

If you have very little memory used on each worker when you do chunk('auto') then I guess it must be using byte ranges to extract subchunks from each chunk, right @martindurant ?
(Which engine is being used? (e.g. netcdf4, h5netcdf, or...)

martindurant commented 2 years ago

I don't know, this is up to the netCDF driver.

dmcg commented 2 years ago

I really appreciate the help, and at least am more convinced that if there is an issue, it is with zarr reading not kerchunk writing.

dmcg commented 2 years ago

So, for the record, I've had a try at rechunking, but started on a simpler version of the same file, with only one height and one realisation

netcdf observations {
dimensions:
        latitude = 960 ;
        longitude = 1280 ;
        bnds = 2 ;
variables:
        float air_pressure(latitude, longitude) ;
                air_pressure:_FillValue = NaNf ;
                air_pressure:standard_name = "air_pressure" ;
                air_pressure:units = "Pa" ;
                air_pressure:grid_mapping = "latitude_longitude" ;
                air_pressure:coordinates = "forecast_period forecast_reference_time time" ;
                air_pressure:_Storage = "contiguous" ;
        int latitude_longitude ;
                latitude_longitude:grid_mapping_name = "latitude_longitude" ;
                latitude_longitude:longitude_of_prime_meridian = 0. ;
                latitude_longitude:earth_radius = 6371229. ;
                latitude_longitude:coordinates = "time forecast_period height realization forecast_reference_time" ;
                latitude_longitude:_Endianness = "little" ;
        int realization ;
                realization:units = "1" ;
                realization:standard_name = "realization" ;
                realization:_Endianness = "little" ;
        float height ;
                height:_FillValue = NaNf ;
                height:axis = "Z" ;
                height:units = "m" ;
                height:standard_name = "height" ;
                height:positive = "up" ;
        float latitude(latitude) ;
                latitude:_FillValue = NaNf ;
                latitude:axis = "Y" ;
                latitude:bounds = "latitude_bnds" ;
                latitude:units = "degrees_north" ;
                latitude:standard_name = "latitude" ;
                latitude:_Storage = "contiguous" ;
        float latitude_bnds(latitude, bnds) ;
                latitude_bnds:_FillValue = NaNf ;
                latitude_bnds:coordinates = "time forecast_period height realization forecast_reference_time" ;
                latitude_bnds:_Storage = "contiguous" ;
        float longitude(longitude) ;
                longitude:_FillValue = NaNf ;
                longitude:axis = "X" ;
                longitude:bounds = "longitude_bnds" ;
                longitude:units = "degrees_east" ;
                longitude:standard_name = "longitude" ;
                longitude:_Storage = "contiguous" ;
        float longitude_bnds(longitude, bnds) ;
                longitude_bnds:_FillValue = NaNf ;
                longitude_bnds:coordinates = "time forecast_period height realization forecast_reference_time" ;
                longitude_bnds:_Storage = "contiguous" ;
        int forecast_period ;
                forecast_period:standard_name = "forecast_period" ;
                forecast_period:units = "seconds" ;
                forecast_period:_Endianness = "little" ;
        int64 forecast_reference_time ;
                forecast_reference_time:standard_name = "forecast_reference_time" ;
                forecast_reference_time:units = "seconds since 1970-01-01" ;
                forecast_reference_time:calendar = "gregorian" ;
                forecast_reference_time:_Endianness = "little" ;
        int64 time ;
                time:standard_name = "time" ;
                time:units = "seconds since 1970-01-01" ;
                time:calendar = "gregorian" ;
                time:_Endianness = "little" ;

// global attributes:
                :history = "2021-11-07T10:27:38Z: StaGE Decoupler" ;
                :institution = "Met Office" ;
                :least_significant_digit = 1L ;
                :mosg__forecast_run_duration = "PT198H" ;
                :mosg__grid_domain = "global" ;
                :mosg__grid_type = "standard" ;
                :mosg__grid_version = "1.6.0" ;
                :mosg__model_configuration = "gl_ens" ;
                :source = "Met Office Unified Model" ;
                :title = "MOGREPS-G Model Forecast on Global 20 km Standard Grid" ;
                :um_version = "11.5" ;
                :Conventions = "CF-1.7" ;
                :_NCProperties = "version=2,netcdf=4.8.1,hdf5=1.12.1" ;
                :_Format = "netCDF-4" ;
}

I wrote a rechunker version, and then read the fspec json back in

with open('observations.json', 'r') as f:
    zarr_dict = json.load(f)

I then synthesised two lattitude chunks of 480 elements each by

zarray_dict = json.loads(zarr_dict['refs']['air_pressure/.zarray'])
zarray_dict

{'chunks': [960, 1280],
 'compressor': None,
 'dtype': '<f4',
 'fill_value': 'NaN',
 'filters': None,
 'order': 'C',
 'shape': [960, 1280],
 'zarr_format': 2}
zarray_dict['chunks'] = [480, 1280]
zarr_dict['refs']['air_pressure/.zarray'] = json.dumps(zarray_dict)

and then the 'files' themselves

zarr_dict['refs']['air_pressure/0.0']

['{{u}}', 9977, 4915200]

zarr_dict['refs']['air_pressure/0.0'] = ['{{u}}', 9977, int(4915200 / 2)]
zarr_dict['refs']['air_pressure/1.0'] = ['{{u}}', 9977 + int(4915200 / 2), int(4915200 / 2)]

Now we have a mutated zarr_dict, we can read it and display it

revised_dataset = open_fsspec_zarr(zarr_dict)
plt.figure(figsize=(6, 6))
plt.imshow(revised_dataset.to_array()[0,...,0], origin='lower') 

You can't see this, but it is world-shaped!

I have to confess that my first attempt was not, and ended up with two hemispheres overlaid and stretched up and down to form the image. I'm also pretty sure that it will be harder to get right with the additional two dimensions in the original file, but as a proof of concept it is a start.

martindurant commented 2 years ago

Well done! What is especially interesting here, is that you can do this "rechunking" after originally creating the references without any need to rescan the original file or add additional arguments.

Agreed that we wouldn't normally want people to attempt this kind of thing themselves, code ought to be better at being systematically correct :)

NikosAlexandris commented 10 months ago

Not arbitrary chunks: for original [ 18, 33, 960, 1280 ], you could have chunks

* [ 9, 33, 960, 1280 ]

* [ 6, 33, 960, 1280 ]

* [ 3, 33, 960, 1280 ]

* [ 1, 33, 960, 1280 ]

* [ 1, 11, 960, 1280 ]

* [ 1, 3, 960, 1280 ]

What other algorithms suggesting a good chunking shape for time series are out there? Can we roughly group some common read access patterns and come up with some recommendations? Or wouldn't it make sense to attempt listing some common use-cases?

martindurant commented 10 months ago

What other algorithms suggesting a good chunking shape for time series are out there?

My list is complete (except even smaller chunks), these are the only possible ways to subchunk the original, given that chunks must be contiguous. If you are however rechunking, then you want something matching the fuzzy statement:

Where "comfort" depends on what exactly the process is doing.

NikosAlexandris commented 10 months ago

What other algorithms suggesting a good chunking shape for time series are out there?

My list is complete (except even smaller chunks), these are the only possible ways to subchunk the original, given that chunks must be contiguous. If you are however rechunking, then you want something matching the fuzzy statement:

* as big as comfortably fits in memory, particularly along the axis(es) of interest.

Where "comfort" depends on what exactly the process is doing.

Would you consider sizes that are multiples (or let me call it compatible!) of the file system's physical block size?

martindurant commented 10 months ago

Would you consider sizes that are multiples (or let me call it compatible!) of the file system's physical block size?

martindurant commented 10 months ago

I doubt that matters. Remote storage doesn't have such a concept anyway, and local storage will be fast enough.