NOAA-OWP / hydrotools

Suite of tools for retrieving USGS NWIS observations and evaluating National Water Model (NWM) data.
Other
54 stars 13 forks source link

Add client tool for NWM 2.1 Retrospective data #157

Open jarq6c opened 2 years ago

jarq6c commented 2 years ago

The NWM 2.1 Retrospective data has been released in zarr format on AWS. Add a client tool to retrieve these data in HydroTools canonical dataframes.

Example to get going:

import xarray as xr
import fsspec

def get_aws_data(start, end, features):
    """Get data from AWS"""

    # Base URL
    url = r'https://noaa-nwm-retrospective-2-1-zarr-pds.s3.amazonaws.com/chrtout.zarr'
    # url = r'https://noaa-nwm-retrospective-2-1-zarr-pds.s3.amazonaws.com/precip.zarr'

    # Get xarray.dataset
    ds = xr.open_zarr(fsspec.get_mapper(url), consolidated=True)

    # Extract time series data
    ts = ds.streamflow.sel(time=slice(start, end), feature_id=features)

    # Return DataFrame
    return ts.to_dataframe().reset_index()

def main():
    # Get pandas.DataFrame
    df = get_aws_data(
        start="2014-01-01 00:00",
        end="2014-01-12 15:00",
        features=[101, 179]
        )

    # Print
    print(df)

if __name__ == "__main__":
    main()

Output

                   time  feature_id   elevation             gage_id   latitude  longitude  order  streamflow
0   2014-01-01 00:00:00         101   36.950001  b'               '  31.086876 -94.640541      3        0.60
1   2014-01-01 00:00:00         179  191.600006  b'               '  46.022163 -67.986412      1        0.01
2   2014-01-01 01:00:00         101   36.950001  b'               '  31.086876 -94.640541      3        0.60
3   2014-01-01 01:00:00         179  191.600006  b'               '  46.022163 -67.986412      1        0.01
4   2014-01-01 02:00:00         101   36.950001  b'               '  31.086876 -94.640541      3        0.59
..                  ...         ...         ...                 ...        ...        ...    ...         ...
555 2014-01-12 13:00:00         179  191.600006  b'               '  46.022163 -67.986412      1        0.67
556 2014-01-12 14:00:00         101   36.950001  b'               '  31.086876 -94.640541      3        0.45
557 2014-01-12 14:00:00         179  191.600006  b'               '  46.022163 -67.986412      1        0.62
558 2014-01-12 15:00:00         101   36.950001  b'               '  31.086876 -94.640541      3        0.45
559 2014-01-12 15:00:00         179  191.600006  b'               '  46.022163 -67.986412      1        0.61

[560 rows x 8 columns]
jarq6c commented 2 years ago

Here's an updated example. Recent versions of xarray and zarr perform the fsspec magic automatically.

import xarray as xr

def get_aws_data(start, end, features):
    """Get data from AWS"""

    # Base URL
    url = r's3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr'
    # url = r's3://noaa-nwm-retrospective-2-1-zarr-pds/precip.zarr'

    # Get xarray.dataset
    # Requires s3fs be installed
    ds = xr.open_dataset(
        url,
        backend_kwargs={
            "storage_options": {"anon": True},
            "consolidated": True
        },
        engine="zarr"
    )

    # Extract time series data
    ts = ds.streamflow.sel(time=slice(start, end), feature_id=features)

    # Return DataFrame
    return ts.to_dataframe().reset_index()

def main():
    # Get pandas.DataFrame
    df = get_aws_data(
        start="2014-01-01 00:00",
        end="2014-01-12 15:00",
        features=[101, 179]
        )

    # Print
    print(df)

if __name__ == "__main__":
    main()
jarq6c commented 2 years ago

I experimented with alternative methods of data retrieval, mostly focused on trying to get dask to automatically partition and control memory usage for large requests. However, I found the additional overhead introduced did not seem to scale well (at least for potential evaluation applications) on a LocalCluster. If your goal is retrieving large continuous chunks of data, then dask for retrieval may be the way to go. However, for random retrieval (say, observation locations spread across thousands of chunks) it seems the above xarray methods may be the most efficient.

However, there may be some ways to structure our calls to enforce reasonable limits and yield a rechunked dataset that would lend itself to local processing in a dask context. For this, our goal would be to retrieve DataFrames that are ~100MB (per dask recommendations). Each of these dataframes could then be a single dask.dataframe.DataFrame partition.

Goal: Retrieve random zarr data while reading each chunk as few times as possible.

Step 1 - Determine chunking of source data

Setting chunks="auto" will return a Dataset containing dask.array.array instead of xarray.Dataarray using the source data chunking. At this point, we're just doing this to determine the source chunking. Here we can see that the data are chunked by time and feature_id. The time chunk size is 336 and feature_id is 15000. Note the output below does not include warnings raised by xarray due to the last chunk of data being different (the data are not chunked evenly).

ds = xr.open_dataset(
            "s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr",
            backend_kwargs={
                "storage_options": {"anon": True},
                "consolidated": True
            },
            engine="zarr",
            chunks="auto"
        )
print(ds)
<xarray.Dataset>
Dimensions:     (feature_id: 2776738, time: 367439)
Coordinates:
    elevation   (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray>
  * feature_id  (feature_id) int32 101 179 181 ... 1180001803 1180001804
    gage_id     (feature_id) |S15 dask.array<chunksize=(2776738,), meta=np.ndarray>
    latitude    (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray>
    longitude   (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray>
    order       (feature_id) int32 dask.array<chunksize=(2776738,), meta=np.ndarray>
  * time        (time) datetime64[ns] 1979-02-01T01:00:00 ... 2020-12-31T23:0...
Data variables:
    crs         |S1 ...
    streamflow  (time, feature_id) float64 dask.array<chunksize=(336, 15000), meta=np.ndarray>
    velocity    (time, feature_id) float64 dask.array<chunksize=(336, 15000), meta=np.ndarray>
Attributes:
    TITLE:                OUTPUT FROM WRF-Hydro v5.2.0-beta2
    code_version:         v5.2.0-beta2
    featureType:          timeSeries
    model_configuration:  retrospective
    proj4:                +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...

Step 2 - Determine chunk size as a DataFrame

Below we're just trying to figure out how big our DataFrames will be on a per chunk basis. Each chunk will produce a dataframe with 5,040,000 rows that is ~157 MB in RAM after optimizations. So to hit ~100 MB per dataframe, we'll want to generate dataframes with ~3,210,191 rows.

# Determine size of single chunk
fid_list = ds.feature_id.data[:15000]
tm_list = ds.time.data[:336]

# Get single chunk of streamflow variable
df = ds.streamflow.sel(
    time=tm_list, 
    feature_id=fid_list
    ).to_dataframe().reset_index()

# Look at memory usage
print(df.info(memory_usage="deep"))

# Optimize memory usage
df["gage_id"] = df["gage_id"].astype("category")
df["streamflow"] = pd.to_numeric(df["streamflow"], downcast="float")
df["feature_id"] = pd.to_numeric(df["feature_id"], downcast="integer")
print(df.info(memory_usage="deep"))
BEFORE OPTIMIZATION
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5040000 entries, 0 to 5039999
Data columns (total 8 columns):
 #   Column      Dtype         
---  ------      -----         
 0   time        datetime64[ns]
 1   feature_id  int64         
 2   elevation   float32       
 3   gage_id     object        
 4   latitude    float32       
 5   longitude   float32       
 6   order       int32         
 7   streamflow  float64       
dtypes: datetime64[ns](1), float32(3), float64(1), int32(1), int64(1), object(1)
memory usage: 461.4 MB
None

AFTER OPTIMIZATION
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5040000 entries, 0 to 5039999
Data columns (total 8 columns):
 #   Column      Dtype         
---  ------      -----         
 0   time        datetime64[ns]
 1   feature_id  int32         
 2   elevation   float32       
 3   gage_id     category      
 4   latitude    float32       
 5   longitude   float32       
 6   order       int32         
 7   streamflow  float32       
dtypes: category(1), datetime64[ns](1), float32(4), int32(2)
memory usage: 158.6 MB
None

Step 3 - Make some assumptions

Let's say we want all 40 years of data from 8000 non-contiguous feature_id. In other words, there's probably no way around reading every chunk at least once. Let's assume our feature_ids of interest are evenly distributed across all 186 feature_id chunks. We can expect 43 of our interesting feature_ids to occur in each chunk on average. To hit our goal of ~3,210,191 rows per dataframe, we'll want to retrieve 74,656 times per retrieval. Rounding to the closest time chunk results in a time slice that is 74,592 hours wide. This will produce an optimized dataframe that is 94.8 MB in memory with 3,207,456 rows (this is lower because the feature_ids I used in this example were small). Retrieving all of our desired data will require 874 separate calls.

fid_list = ds.feature_id.data[:43]
tm_list = ds.time.data[:74592]

df = ds.streamflow.sel(
    time=tm_list, 
    feature_id=fid_list
    ).to_dataframe().reset_index()
df["gage_id"] = df["gage_id"].astype("category")
df["streamflow"] = pd.to_numeric(df["streamflow"], downcast="float")
df["feature_id"] = pd.to_numeric(df["feature_id"], downcast="integer")
print(df.info(memory_usage="deep"))
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3207456 entries, 0 to 3207455
Data columns (total 8 columns):
 #   Column      Dtype         
---  ------      -----         
 0   time        datetime64[ns]
 1   feature_id  int16         
 2   elevation   float32       
 3   gage_id     category      
 4   latitude    float32       
 5   longitude   float32       
 6   order       int32         
 7   streamflow  float32       
dtypes: category(1), datetime64[ns](1), float32(4), int16(1), int32(1)
memory usage: 94.8 MB
None

Step 4 - Scaling it up

The previous assumptions were suitable for someone working on a laptop with limited cores and memory. However, we can do better with more resources. A natural inclination would be to use dask to conduct the retrieval. However, I'm not convinced that will be efficient for this type of random retrieval. The big problem here is that we want relatively little data that's spread across 10's of thousands of chunks. With more resources, a potential alternative algorithm could use the methods above, but make larger requests that generate larger dataframes (say ~200 to ~400 MB). Using to_dask_dataframe these larger dataframes can be repartitioned to ~100MB easily.

Step 5 - Validation

This is the most difficult step. There doesn't seem to be a good way to validate the data unless the data provider supplies checksums for every chunk as part of their data model. Furthermore, if a chunk fails to return, it's unclear how to make sure the information gets funneled up to the user. The xarray-fsspec-s3fs-dask-zarr system is complicated and as near as I can tell chunk failures get silenced by one of these packages. Further, because of the way missing data are automatically filled in, it's unclear whether missing data are real "missing data" or whether a chunk failed to return. The only way I can think of to validate is to pull the data at least twice and compare the output using DataFrame.eq.

jarq6c commented 2 years ago

Here's an example implementation of the principles explained above.

import xarray as xr
import numpy as np
import pandas as pd
from pathlib import Path
# from dask.distributed import Client, LocalCluster

def retrieve_data(params):
    # Extract parameters
    ds = params[0]
    feature_ids = params[1]
    start = params[2]
    end = params[3]
    ofile = params[4]

    # Retrieve data
    #     I use to_netcdf here because it writes in parallel
    ds[["streamflow"]].sel(
        time=slice(start, end), 
        feature_id=feature_ids
        ).to_netcdf(ofile)

def main():
    # Open dataset
    ds = xr.open_dataset(
                "s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr",
                backend_kwargs={
                    "storage_options": {"anon": True},
                    "consolidated": True
                },
                engine="zarr",
            )

    # Find features that correspond to a USGS streamflow gage
    gages = (ds.gage_id.where(ds.gage_id != ''.rjust(15).encode(), drop=True))

    # Get a list of feature IDs
    fids = gages.feature_id.data

    # Split up feature IDs into groups of 43
    no_chunks = (fids.size // 43) + 1
    fids_split = np.array_split(fids, no_chunks)

    # Generate time slices
    start = pd.Timestamp(ds.time.min().values)
    end = pd.Timestamp(ds.time.max().values)
    periods = int((end - start) / pd.Timedelta("74592H")) + 1
    first_ts = pd.date_range(
        start=start,
        periods=periods,
        freq="74592H"
    )
    last_ts = first_ts + pd.Timedelta("74591H")

    # File directory
    odir = Path("retro_data")
    odir.mkdir(exist_ok=True)

    # Generate parameters
    input_parameters = []
    count = 0
    for fid_chunk in fids_split:
        for ft, lt in zip(first_ts, last_ts):
            ofile = odir / f"chunk_{count}.nc"
            p = [ds, fid_chunk, ft, lt, ofile]
            input_parameters.append(p)
            count += 1

    # # Retrieve data in "serial"
    # NOTE This actually retrieves the data asynchronously by source chunk and
    #   writes the NetCDF files in parallel using all cores
    for params in input_parameters:
        retrieve_data(params)

    # Retrieve data in parallell
    # NOTE I don't think this will actually work and is probably unnecessary
    #   However, you could setup a Client(LocalCluster) to limit the number
    #   of processes and/or RAM used. Though the chunking scheme inherent to 
    #   input_parameters is already conservative.
    # cluster = LocalCluster(
    #     n_workers=4,
    #     processes=True,
    #     threads_per_worker=1
    # )
    # with Client(cluster) as client:
    #     client.map(retrieve_data, input_parameters)

    # Close dataset
    ds.close()

    # Do some larger-than-memory computations
    # NOTE I'm using dask to compute here, but you could also transform, optimize, shuffle, 
    #     and write the data to parquet
    df = xr.open_mfdataset(odir.glob("*.nc")).to_dask_dataframe()
    computation = df.groupby("feature_id").max().compute()
    print(computation)

if __name__ == "__main__":
    main()

To make the "parallel" version work, I think you'd have to use futures or delayed objects. I think every process would require its own ds = xr.open_dataset, which involves some overhead. This may be the best we can do on a single machine. Making this go faster would rely on retrieving bigger chunks.

Having said that, there is potential here to scale up to a real distributed cluster.

jameshalgren commented 9 months ago

@jarq6c, I tried to track down information and couldn't find something to explain the difference between open_dataset and open_zarr... any light to shed on that?

Here's an example of open_zarr opening the same set of files you refer to above. @sepehrkrz, maybe we can do a performance test...

'''
#install these libraries if they aren't already installed
!pip install zarr
!pip install xarray
!pip install s3fs
!pip install numpy
'''
# Import needed libraries

import xarray as xr
import numpy as np
import s3fs
from datetime import datetime, timedelta

# open the zarr store
url = "s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr"
fs = s3fs.S3FileSystem(anon=True)
store = xr.open_zarr(s3fs.S3Map(url, s3=fs))

# Function to get the time series for a specified reach id and and time range
# then write it out to a csv file.
def GetAndWriteTimeSeriesAtReach(reach_id, start_time_index, end_time_index):
    flows = streamflow_array.where(feature_id_array==reach_id, drop=True)
    df_flows = flows[start_time_index:end_time_index].to_dataframe()
    df_flows.to_csv(f'flows_{reach_id}.csv')

# get an xarray array of the various values
time_array = store['time']
feature_id_array = store['feature_id']
streamflow_array = store['streamflow']

# Define the feature IDs to check for
feature_ids = [5781221, 5781223, 5781703]

# Specify the start and end times of interest
start_time = datetime(2015, 5, 23, 0, 0, 0)
end_time = datetime(2015, 6, 24, 0, 0, 0)

# Get the indices for the needed dates
zero_start_time = start_date = datetime(1979, 2, 1, 0, 0, 0)
start_time_index = int((start_time - zero_start_time).total_seconds() / 3600)
end_time_index = int((end_time - zero_start_time).total_seconds() / 3600)

for reach_id in feature_ids:
    GetAndWriteTimeSeriesAtReach(reach_id, start_time_index, end_time_index)
jarq6c commented 9 months ago

@jameshalgren Sorry for the delayed response. In practice, I haven't found any difference between open_dataset and open_zarr, assuming engine="zarr" is used with open_dataset. Peaking at the source code in the xarray repo, it does look like open_dataset introduces a few additional layers and sets up a pythonic file system (via fsspec) for you. Anecdotally, I found some posts on stack from over a year ago that suggested a performance difference between open_dataset and open_zarr, but offered no evidence.