Open derekocallaghan opened 1 year ago
Here's the current draft of the recipe port, containing a new IcsWindSpeedDirection
transform, that appears to run successfully:
import apache_beam as beam
import dask
import dask.array as da
from dataclasses import dataclass, field
from datetime import datetime
from metpy.calc import wind_direction, wind_speed
import pandas as pd
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern, prune_pattern
from pangeo_forge_recipes.transforms import _add_keys, OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from tempfile import TemporaryDirectory
import xarray as xr
missing_dates = [
pd.Timestamp(d)
for d in [
"2017-03-11",
"2017-03-12",
"2017-04-10",
"2017-05-25",
"2017-05-26",
"2018-01-04",
"2020-07-09",
"2020-07-13",
"2020-07-15",
"2020-08-04",
"2020-08-09",
"2020-08-11",
"2020-10-22",
"2020-10-23",
]
]
dates = pd.date_range("2015-01-16", "2021-09-30", freq="D")
# Drop missing dates
dates = dates.drop(missing_dates)
def make_url(time):
year = time.strftime("%Y")
month = time.strftime("%m")
day = time.strftime("%d")
url = f"https://data.remss.com/ccmp/v02.1.NRT/Y{year}/M{month}/CCMP_RT_Wind_Analysis_{year}{month}{day}_V02.1_L3.0_RSS.nc"
return url
# Daily products with 6-hourly values
NITEMS_PER_FILE = 4
pattern = FilePattern(
make_url,
ConcatDim(name="time", keys=dates, nitems_per_file=NITEMS_PER_FILE),
)
pattern_pruned = prune_pattern(pattern)
def ics_wind_speed_direction(ds: xr.Dataset) -> xr.Dataset:
"""
Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and
direction for the u and v components in the specified product. Dask arrays are
created for delayed execution.
"""
# ICS grid
geospatial_lat_min = 45.75
geospatial_lat_max = 58.25
geospatial_lon_min = 333.85
geospatial_lon_max = 355.35
icds = ds.sel(
latitude=slice(geospatial_lat_min, geospatial_lat_max),
longitude=slice(geospatial_lon_min, geospatial_lon_max),
)
# Remove subset of original attrs as they're no longer relevant
for attr in ["base_date", "date_created", "history"]:
del icds.attrs[attr]
# Update the grid attributes
icds.attrs.update(
{
"geospatial_lat_min": geospatial_lat_min,
"geospatial_lat_max": geospatial_lat_max,
"geospatial_lon_min": geospatial_lon_min,
"geospatial_lon_max": geospatial_lon_max,
}
)
u = icds.uwnd
v = icds.vwnd
# Original wind speed 'units': 'm s-1' attribute not accepted by MetPy,
# use the unit contained in ERA5 data
ccmp_wind_speed_units = u.units
era5_wind_speed_units = "m s**-1"
u.attrs["units"] = era5_wind_speed_units
v.attrs["units"] = era5_wind_speed_units
variables = [
{
"name": "wind_speed",
"metpy_fn": wind_speed,
"attrs": {"long_name": "Wind speed", "units": ccmp_wind_speed_units},
},
{
"name": "wind_direction",
"metpy_fn": wind_direction,
"attrs": {"long_name": "Wind direction", "units": "degree"},
},
]
# CCMP provides u/v at a single height, 10m
for variable in variables:
icds[variable["name"]] = (
xr.DataArray(
da.from_delayed(
dask.delayed(variable["metpy_fn"](u, v).values), u.shape, dtype=u.dtype
),
coords=u.coords,
dims=u.dims,
)
.assign_coords(height=10)
.expand_dims(["height"])
)
icds[variable["name"]].attrs.update(variable["attrs"])
icds.height.attrs.update(
{
"long_name": "Height above the surface",
"standard_name": "height",
"units": "m",
}
)
# Restore units
for variable in ["uwnd", "vwnd"]:
icds[variable].attrs["units"] = ccmp_wind_speed_units
icds.attrs["eooffshore_zarr_creation_time"] = datetime.strftime(
datetime.now(), "%Y-%m-%dT%H:%M:%SZ"
)
icds.attrs[
"eooffshore_zarr_details"
] = "EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com."
return icds
@dataclass
class IcsWindSpeedDirection(beam.PTransform):
"""
Selects a subset for the Irish Continental Shelf (ICS) region, and computes wind speed and
direction for the u and v components in the specified product. Dask arrays are
created for delayed execution.
"""
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(_add_keys(ics_wind_speed_direction))
# TODO: I'm seeing issues where the required 8000 chunk size is used for the pruned recipe
# I think XarrayZarrRecipe reduces this to the total number of time observations when
# they're < specified chunk size. I need to investigate further.
# For now, chunk size is based on the pruned pattern size
# target_chunks = {"time": 8000, "latitude": -1, "longitude": -1}
target_chunks = {"time": pattern_pruned.dims["time"] * NITEMS_PER_FILE, "latitude": -1, "longitude": -1}
td = TemporaryDirectory()
target_path = td.name + "/ccmp.zarr"
target_path = "./ccmp.zarr"
target_path
transforms = (
beam.Create(pattern_pruned.items())
| OpenURLWithFSSpec(cache_url="./testcache")
| OpenWithXarray(file_type=pattern_pruned.file_type)
| IcsWindSpeedDirection()
| StoreToZarr(
target_url=target_path,
combine_dims=pattern.combine_dim_keys,
target_chunks=target_chunks,
)
)
with beam.Pipeline() as p:
p | transforms
ds = xr.open_zarr(target_path)
print(ds)
I've confirmed the expected output as I did with the CCMP staged recipe:
In [86]: ds = xr.open_zarr('./ccmp.zarr')
In [87]: ds
Out[87]:
<xarray.Dataset>
Dimensions: (height: 1, latitude: 50, longitude: 86, time: 8)
Coordinates:
* height (height) int64 10
* latitude (latitude) float32 45.88 46.12 46.38 ... 57.62 57.88 58.12
* longitude (longitude) float32 333.9 334.1 334.4 ... 354.6 354.9 355.1
* time (time) datetime64[ns] 2015-01-16 2015-01-16 ... 2015-01-17
Data variables:
nobs (time, latitude, longitude) float32 dask.array<chunksize=(8, 50, 86), meta=np.ndarray>
uwnd (time, latitude, longitude) float32 dask.array<chunksize=(8, 50, 86), meta=np.ndarray>
vwnd (time, latitude, longitude) float32 dask.array<chunksize=(8, 50, 86), meta=np.ndarray>
wind_direction (height, time, latitude, longitude) float32 dask.array<chunksize=(1, 8, 50, 86), meta=np.ndarray>
wind_speed (height, time, latitude, longitude) float32 dask.array<chunksize=(1, 8, 50, 86), meta=np.ndarray>
Attributes: (12/35)
Conventions: CF-1.6
comment: none
contact: Remote Sensing Systems, support@remss.com
contributor_name: Carl Mears, Joel Scott, Frank Wentz, Ross...
contributor_role: Co-Investigator, Software Engineer, Proje...
creator_email: support@remss.com
... ...
publisher_email: support@remss.com
publisher_name: Remote Sensing Systems
publisher_url: http://www.remss.com/
references: Mears et al., Journal of Geophysical Rese...
summary: CCMP_RT V2.1 has been created using the s...
title: RSS CCMP_RT V2.1 derived surface winds (L...
In [88]: ds.time.values
Out[88]:
array(['2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
'2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
'2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000',
'2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000'],
dtype='datetime64[ns]')
In [89]: ds.resample(time='D').mean().wind_speed.isel(latitude=0,longitude=0).compute()
Out[89]:
<xarray.DataArray 'wind_speed' (time: 2, height: 1)>
array([[10.636068],
[14.07321 ]], dtype=float32)
Coordinates:
* height (height) int64 10
latitude float32 45.88
longitude float32 333.9
* time (time) datetime64[ns] 2015-01-16 2015-01-17
Attributes:
long_name: Wind speed
units: m s-1
In [90]: ds.eooffshore_zarr_details
Out[90]: 'EOOffshore Project: Concatenated CCMP v0.2.1.NRT 6-hourly wind products provided by Remote Sensing Systems (RSS), for Irish Continental Shelf. Wind speed and direction have been calculated from the uwnd and vwnd variables. CCMP Version-2 vector wind analyses are produced by Remote Sensing Systems. Data are available at www.remss.com.'
FYI @rabernat:
target_chunks
to StoreToZarr
. Once I did this, the time chunking was as expected.XarrayZarrRecipe
currently reduces the (time) chunk size when the pruned pattern total size is lower (here, 8) than the specified chunk size (e.g. 8000). With the Beam version, if I specify the full size (8000) it looks like this reduction isn't happening, so I've based the size on the pruned pattern for now. I'll look into this and get back to you.pangeo_forge_recipes.transforms._add_keys
in the new transform.It looks like the time
values aren't being stored correctly in the output Zarr, i.e. in the output above:
In [88]: ds.time.values
Out[88]:
array(['2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
'2015-01-16T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
'2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000',
'2015-01-17T00:00:00.000000000', '2015-01-17T00:00:00.000000000'],
dtype='datetime64[ns]')
These should be 6-hourly values as generated by the original recipe e.g.:
In [4]: ds.time.values
Out[4]:
array(['2015-01-16T00:00:00.000000000', '2015-01-16T06:00:00.000000000',
'2015-01-16T12:00:00.000000000', '2015-01-16T18:00:00.000000000',
'2015-01-17T00:00:00.000000000', '2015-01-17T06:00:00.000000000',
'2015-01-17T12:00:00.000000000', '2015-01-17T18:00:00.000000000'],
dtype='datetime64[ns]')
This might be a candidate for the proposed separate issue for migrating tutorials from notebooks -> scripts (with CI testing)
Following approach in #445, this will port the EOOffshore CCMP recipe (https://github.com/pangeo-forge/staged-recipes/pull/145) to use the
beam-refactor
branch. It will involve writing a customPTransform
for computing new wind speed and direction variables etc.