Open chuckwondo opened 1 year ago
As an example of one of the files:
>>> import xarray as xr
>>> from datetime import date
>>> url_pattern = (
... "https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
... "MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
... "ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
... )
>>> url = url_pattern.format(time=date(1995, 8, 1))
>>> url
'https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/1995/08/ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-19950801000000-fv2.00.nc'
>>> ds = xr.open_dataset(f"{url}#mode=bytes", chunks="auto")
>>> ds
<xarray.Dataset>
Dimensions: (time: 1, lat: 18000, lon: 36000, length_scale: 1,
channel: 2)
Coordinates:
* time (time) datetime64[ns] 1995-08-01
* lat (lat) float32 -90.0 -89.99 -89.98 ... 89.97 89.98 89.99
* lon (lon) float32 -180.0 -180.0 -180.0 ... 180.0 180.0 180.0
* channel (channel) float32 11.0 12.0
Dimensions without coordinates: length_scale
Data variables: (12/14)
dtime (time, lat, lon) timedelta64[ns] dask.array<chunksize=(1, 2895, 5794), meta=np.ndarray>
satze (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
sataz (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
solze (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
solaz (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
... ...
lst_unc_loc_atm (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst_unc_loc_sfc (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst_unc_sys (length_scale) float32 dask.array<chunksize=(1,), meta=np.ndarray>
lcc (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
n (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst_unc_loc_cor (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
Attributes: (12/41)
source: ESA LST CCI IRCDR L3S V2.00
title: ESA LST CCI land surface temperature time ser...
institution: University of Leicester
history: Created using software developed at Universit...
references: https://climate.esa.int/en/projects/land-surf...
Conventions: CF-1.8
... ...
geospatial_lon_resolution: 0.01
geospatial_lat_resolution: 0.01
key_variables: land_surface_temperature
format_version: CCI Data Standards v2.2
spatial_resolution: 0.01 degree
doi: 10.5285/785ef9d3965442669bff899540747e28
>>>
To possibly bypass the memory issue, I've attempted to use kerchunk instead, based upon the HDF Reference Recipe for CMIP6, but also to no avail:
import os
from tempfile import TemporaryDirectory
import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
CombineReferences,
OpenWithKerchunk,
WriteCombinedReference,
)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(2)
temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "ceda-monthly-daytime-lst"
target_store = os.path.join(target_root, store_name)
os.mkdir(target_store)
transforms = (
beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| CombineReferences(
# Same error, regardless of inclusion/exclusion of "length_scale"
# concat_dims=["time"], identical_dims=["lat", "lon", "channel", "length_scale"]
concat_dims=["time"], identical_dims=["lat", "lon", "channel"]
)
| WriteCombinedReference(target_root=target_root, store_name=store_name)
)
print(f"{pattern=}")
print(f"{target_store=} (exists: {os.path.isdir(target_store)})")
print(f"{transforms=}")
with beam.Pipeline() as p:
p | transforms # type: ignore[reportUnusedExpression]
with xr.open_zarr(target_store) as ds:
print(ds)
Running this version results in the following, so I don't know if there's a way to eliminate the problematic length_scale
coordinate that lst_unc_sys
uses. Also, I don't understand why I'm seeing the UserWarning about the time coord:
$ time python recipe-kerchunk.py
pattern=<FilePattern {'time': 2}>
target_store='/var/folders/v_/q9ql2x2n3dlg2td_b6xkcjzw0000gn/T/tmpdt_pu6g5/ceda-monthly-daytime-lst' (exists: True)
transforms=<_ChainedPTransform(PTransform) label=[Create|OpenWithKerchunk|CombineReferences|WriteCombinedReference] at 0x10e458510>
.../python3.11/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [4.6008e+08]
...
.../python3.11/site-packages/kerchunk/combine.py", line 403, in second_pass
raise ValueError(
ValueError: Found chunk size mismatch:
at prefix lst_unc_sys in iteration 1 (file None)
new chunk: [1, 1]
chunks so far: [1]
real 12m39.490s
user 0m53.177s
sys 0m15.788s
@chuckwondo thanks for using Pangeo Forge and reporting back on your experience. There are definitely still some unpolished edges and these types of reports are invaluable for surfacing areas we need to work on more.
On first glance, what comes to mind for me is that when you call
with beam.Pipeline() as p:
p | transforms
Beam is presumably selecting the default local multithreaded runner (because no other runner is identified) which then attempts to fetch and open as many of your source files as possible in concurrent threads, which could easily end up being (far) in excess of 40GB depending on how big each source file is and how many of them there are. (Even if you have a modest number of threads, it's possible Beam is also attempting to dump opened input files into a serialized in-memory cache as it waits for the rest of the inputs to be opened.)
So a question for you: what is the aggregate size you expect the final zarr store to be? If < 40GB, then this does sound like a memory leak of some sort. If => 40GB, then this might be somewhat expected behavior for beam. In the latter case, we should figure out what distributed runner (Flink, GCP Dataflow, etc.) may make sense for this job. Generally speaking, for any large "production" build, we would recommend using a distributed runner, as the local runners are not built for reliably moving large amounts of data. Apologies that this is not currently clearer in the docs. I'm currently working on a docs revision which makes that more apparent.
@cisaacstern, thanks for the reply.
In the version of the code in my initial description, I've pruned the pattern to only 1 file:
# Prune to 1 element to minimize memory reqs for now
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(1)
This is confirmed by the initial line of output:
pattern=<FilePattern {'time': 1}>
While each file is rather larger (~1.5G), I wouldn't expect operating on only 1 file to consume such an incredible amount of memory (40G+, perhaps even far more, while I wasn't looking).
Regarding your question about how large I expect the final zarr file to be, I'm not particularly familiar with the format to know how the size of the input files (1 file in this case) would translate into the size of the final zarr file.
However, I wouldn't expect processing a single 1.5G file to cause things to collapse even on a local runner, but I don't know what's happening under the covers.
I specifically pruned the pattern to 1 file (I originally used 2, which caused my original memory issue, so I pruned to only 1 to see if that would help, to no avail) to limit memory consumption as much as possible, just to confirm that I could produce sensible output before potentially submitting the recipe to the staged-recipes repo (if that's even the correct course of action), where I imagine it would be executed on a sufficiently large platform to process all of the files.
Do you have any further insights, or any suggestions?
I have reproduced this issue. Now investigating.
This is the problem
target_chunks={"time": 1, "lat": 5, "lon": 5},
You are trying to decimate this very large dataset into a tiny, tiny number of chunks. This is overwhelming beam with a vast number of tasks.
The original dataset dimensions are {'time': 1, 'lat': 18000, 'lon': 36000, 'length_scale': 1, 'channel': 2}
. So this pipeline is creating 25920000 different tasks (one for each chunk).
Beyond the inefficient pipeline, this Zarr with these tiny chunks would be extremely hard to use. We generally aim for chunks from 1-100 MB. The following chunks seemed to work for me.
target_chunks={"lat": 1800, "lon": 1800},
However, then I hit a new error
IndexError: too many indices for array; expected 1, got 4
which I swear I have seen before but can't remember where.
Ok, I got this pipeline working. Annotated changes below.
# set up a cache location
cache_dir = TemporaryDirectory()
cache_root = cache_dir.name
transforms = (
beam.Create(pattern.items())
# Caching the file means that we don't have to load each of the 200 chunks over the network
# (which in this case is a transatlantic hop)
# Instead we cache it to local file storage
# (or could use an in-region object store for a distributed pipeline)
| OpenURLWithFSSpec(cache=cache_root)
| OpenWithXarray(
file_type=pattern.file_type,
# This variable is problematic because it doesn't have the time dimension,
# so Pangeo Forge gets confused about how to deal with it.
# The easiest thing is to drop it. Could also promote it to a coordinate variable.
xarray_open_kwargs={"drop_variables": ["lst_unc_sys"]},
)
| StoreToZarr(
target_root=target_root,
store_name=store_name,
combine_dims=pattern.combine_dim_keys,
# approx 25 MB chunks
target_chunks={"lat": 1800, "lon": 1800},
)
)
This works. It's not particularly fast in this configuration, but it should parallelize well.
Fantastic! Thank you!
Yes, at one point I had also run into that IndexError
you mentioned. I don't recall how.
I had also tried larger values for lat
and lon
chunks, but I ran into buffer overflow errors. However, I was also including a time
chunk of 1
. I hadn't tried dropping it because I had read somewhere in the docs that the library would otherwise assume enough available memory to hold all files being processed.
Are we able to drop the time
chunk because time
is the concat dim?
Also, how did you determine that your proposed target chunks are ~25MB in size (I'm still a noob at all of this)?
Are we able to drop the
time
chunk becausetime
is the concat dim?
Correct.
Also, how did you determine that your proposed target chunks are ~25MB in size (I'm still a noob at all of this)?
1800 1800 8 bytes = 25.9 MB
FWIW, I would reduce the precision of this data to float 32. You almost never need 64 bits of precision for data analytics.
Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?
I'm going to try different chunk settings (to get me nearer to ~100MB -- isn't that the "magic" number, give or take?), particularly since I can't seem to get your code suggestion to work. I keep getting either aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
or fsspec.exceptions.FSTimeoutError
. Perhaps fiddling with the chunk size will get me past them.
Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?
Ok I see, yes this is definitely the case for the original file. For my recipe however, the target dataset was all float64. That's not good. 😕
I keep getting either
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
orfsspec.exceptions.FSTimeoutError
. Perhaps fiddling with the chunk size will get me past them.
Do you know if this timeout is coming from the upstream CEDA server?
Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?
Ok I see, yes this is definitely the case for the original file. For my recipe however, the target dataset was all float64. That's not good. 😕
I finally got a successful run. I used target_chunks={"lat": 3600, "lon": 7200}
and it took ~30 min. to complete.
I now see that the result is float64 that you mentioned, even though the original is float32. Is there any way for us to prevent this unwanted conversion?
I keep getting either
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
orfsspec.exceptions.FSTimeoutError
. Perhaps fiddling with the chunk size will get me past them.Do you know if this timeout is coming from the upstream CEDA server?
I suspect the problem may be on the CEDA server side. While using the CEDA OPeNDAP server during the course of some experimental work (which is what led me to look at writing this recipe, on the advice of @sharkinsspatial), we've experienced a great deal of flakiness.
I also see a few dozen of these 2 warnings during execution of the recipe:
.../python3.11/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
and
.../python3.11/site-packages/xarray/coding/times.py:618: RuntimeWarning: invalid value encountered in cast
int_num = np.asarray(num, dtype=np.int64)
Is there any way (or need) to address these?
Here's the current, successfully running code (running locally, pruned to 2 files), when the server isn't flaking out. I also managed to get a successful local run pruned to 10 files (just barely, as I saw memory spike to over 70G!), which led me to discover that there are a couple of gaps in the monthly files, so the date range is adjusted for those gaps.
import os
from pathlib import Path
from tempfile import TemporaryDirectory
import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
)
# Monthly data spans from 1995-08 through 2020-12, with the exception of two
# gaps: (1) 1996-01 through 1996-06 and (2) 2001-02 through 2001-06.
months = (
pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
.difference(pd.date_range("1996-01", "1996-06", freq=pd.offsets.MonthBegin()))
.difference(pd.date_range("2001-02", "2001-06", freq=pd.offsets.MonthBegin()))
)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
"#mode=bytes"
)
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time").prune(2)
target_root = str(Path().absolute())
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)
cache_dir = TemporaryDirectory()
cache_root = cache_dir.name
transforms = (
beam.Create(pattern.items())
| OpenURLWithFSSpec(cache=cache_root)
| OpenWithXarray(
file_type=pattern.file_type,
# The lst_unc_sys var doesn't have the time dimension, which causes a
# "chunk size mismatch" error, so we must drop it.
xarray_open_kwargs={"drop_variables": ["lst_unc_sys"], "chunks": "auto"},
)
| StoreToZarr(
target_root=target_root,
store_name=store_name,
combine_dims=pattern.combine_dim_keys,
target_chunks={"lat": 1800, "lon": 3600},
)
)
with beam.Pipeline() as p:
p | transforms # type: ignore[reportUnusedExpression]
with xr.open_zarr(target_store) as ds:
print(ds)
Now my question is, what do I need to do from here to make a good submission for the staged recipes repo?
In particular, I'm wondering about the following things (and I suspect there are other considerations that I'm not aware of):
@chuckwondo, way to push this to a working state! Responses inline:
Now my question is, what do I need to do from here to make a good submission for the staged recipes repo?
The deployment of recipes from staged-recipes
is not currently maintained. Do you have access to a GCP Dataflow and/or AWS account to deploy this to? The following is a current example of using our GitHub Action to deploy to GCP Dataflow (but could be adapted for AWS): https://github.com/pangeo-forge/aqua-modis-feedstock/blob/8cb545d20d4c08e49ec176a27da1dc48d4c2191d/.github/workflows/deploy.yaml#L31-L69
Since it appears you're at DevSeed, perhaps coordinating with @sharkinsspatial on deployment infra makes sense. Sean, where have you been deploying to lately?
How should I specify the target store location? I suspect this should be a cloud location, no? Same question for the cache directory.
As indicated in the GitHub Workflow example above, we recommend configuring this (and deploying) with the pangeo-forge-runner
CLI. The docs are lagging behind on this subject, and are currently my top priority, apologies for the lack of clarity here. One of the better reference points I can point to today is the integration tests on pangeo-forge-recipes
which (though a bit opaque, due to fixturing), do represent a fully tested use of the CLI to deploy to a local bakery: https://github.com/pangeo-forge/pangeo-forge-recipes/blob/c2927775cf0f992be4b93153028c345ca4f7c14c/tests/test_integration.py#L60.
I will aim to have better docs on this within a week or so, and will link them to you here ASAP.
Is there any way to eliminate the warnings mentioned in my previous comment? Should I even bother?
TBH, I'm not entirely clear what these mean so I'd ignore for now, assuming it doesn't appear to be affecting your output.
Is there any way to avoid the conversion from float32 to float64 mentioned in an earlier comment?
I'm not sure how to do this, if you're able to find a solution yourself, I'd be curious to know! Alternatively, could you open a dedicated Issue for this?
I'm following the PangeoForge recipe tutorial Xarray-to-Zarr Sequential Recipe: NOAA OISST to create a recipe for CEDA monthly daytime land surface temperature data, but I'm running into issues using pangeo-forge-recipes version 0.10.0 (obtained via conda-forge).
Here's my code in
recipe.py
(I'm using python 3.11):When I run this, it is eventually killed because it consumes an obscene amount of memory. I saw the python process exceed 40G of memory (on my 16G machine), but it may very well have gone beyond that while I wasn't watching it -- it ran for about 3.5 hours!:
I'm going to downgrade pangeo-forge-recipes to a version prior to the recently introduced breaking API changes to see if I encounter the same problem with the old API, but in the meantime, is there anything glaringly wrong with what I've written above that would cause the memory issue?