NOAA-OWP / hydrofabric

hydrofabric meta-package
https://noaa-owp.github.io/hydrofabric/
MIT License
27 stars 10 forks source link

Catchment Weights #34

Closed JordanLaserGit closed 8 months ago

JordanLaserGit commented 9 months ago

Catchment weights (the indices within the lat/lon divides of a particular catchment relative to some grid) are needed for extracting forcing data. The method of calculating these weights can vary and yield different weights depending on methods used See issue 28 in hfsubset. Also, these weights may change with each hydrofabric release. To help ensure catchment weights are consistent for a particular hydrofabric and grid, I suggest generating conus weight files for common grids(projections) that can be subsetted via a tool like hfsubset.

For example, NWM v3 uses a lambert conformal conic projection.

https://noaa-nwm-pds.s3.amazonaws.com/nwm.20240112/forcing_medium_range/nwm.t00z.medium_range.forcing.f001.conus.nc

>>> nwm_data.crs.esri_pe_string
'PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DATUM["D_Sphere",SPHEROID["Sphere",6370000.0,0.0]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP"],PARAMETER["false_easting",0.0],PARAMETER["false_northing",0.0],PARAMETER["central_meridian",-97.0],PARAMETER["standard_parallel_1",30.0],PARAMETER["standard_parallel_2",60.0],PARAMETER["latitude_of_origin",40.0],UNIT["Meter",1.0]];-35691800 -29075200 10000;-100000 10000;-100000 10000;0.001;0.001;0.001;IsHighPrecision'

The projection type and hydrofabric can be stored as fields in the weights.json. This way forcings engines could check that the weight projection is appropriate for the nwm file being processed. Information about the grid should be stored in the json as well (dx, dy, nx, ny, x0, y0).

This will place more responsibility on the hydrofabric, but might be worth it in the long run to avoid weight generation algorithms yielding different weights for the same hydrofabric and grid.

mikejohnson51 commented 8 months ago

Hi @JordanLaserGit, @program--

https://github.com/LynkerIntel/hfsubset/issues/28

Please take a look at this new capability and data source and let me know if its looking like what you need. I have to say it is pretty slick and actually reverts back to our initial data products pre forcing workstream!

I think the details might need a little TLC but lets see how you guys use them and we can adjust:

# Download per the link you shared and read in ...
fg = terra::rast('/Users/mjohnson/Downloads/nwm.t00z.medium_range.forcing.f001.conus.nc')

# Define divide ids however you like, here is a 2 divide example
divs = c("cat-1866993", "cat-1866995")

# Extract precomputed weights for the forcing input and divides
w = arrow::open_dataset('s3://lynker-spatial/v20.1/forcing_weights.parquet') |> 
  dplyr::filter(divide_id %in% divs, grid_id == "medium_range.forcing.conus") |>
  dplyr::collect()

# Extract cell level values from forcing grid
x = zonal::weight_grid_to_data(fg, w = w)

# Explore output
head(x)
#>      divide_id    cell coverage_fraction                    grid_id    U2D
#> 1: cat-1866993 7632614       0.006292699 medium_range.forcing.conus -1.083
#> 2: cat-1866993 7632615       0.083473243 medium_range.forcing.conus -1.089
#> 3: cat-1866993 7637220       0.007748126 medium_range.forcing.conus -1.039
#> 4: cat-1866993 7637221       0.167723924 medium_range.forcing.conus -1.045
#> 5: cat-1866993 7637222       0.868241370 medium_range.forcing.conus -1.052
#> 6: cat-1866993 7637223       0.815731525 medium_range.forcing.conus -1.058
#>       V2D  LWDOWN     RAINRATE    T2D      Q2D    PSFC SWDOWN
#> 1: -1.317 213.503 2.576764e-06 259.58 0.001265 83949.5      0
#> 2: -1.266 213.116 2.609972e-06 259.48 0.001254 84060.7      0
#> 3: -1.385 214.007 3.093114e-06 259.78 0.001284 83776.8      0
#> 4: -1.347 213.779 3.045424e-06 259.65 0.001272 83803.0      0
#> 5: -1.307 213.526 3.004070e-06 259.55 0.001262 83893.8      0
#> 6: -1.267 213.249 2.953562e-06 259.46 0.001253 84004.0      0

# Or, re-scale on the fly...
(x2 = zonal::execute_zonal(fg, w = w, ID = "divide_id"))
#>      divide_id                    grid_id        U2D       V2D   LWDOWN
#> 1: cat-1866993 medium_range.forcing.conus -0.9880872 -1.219789 213.9363
#> 2: cat-1866995 medium_range.forcing.conus -0.9106252 -1.148223 215.3706
#>        RAINRATE      T2D         Q2D     PSFC SWDOWN
#> 1: 4.062485e-06 259.5877 0.001264434 84141.37      0
#> 2: 5.190276e-06 259.7551 0.001279137 84386.35      0

# You can find the valid grid data structures in the top level JSON
jsonlite::fromJSON('https://lynker-spatial.s3.amazonaws.com/v20.1/forcing_grids.json',
simplifyDataFrame = TRUE)
#>                      grid_id   schema       X1      Xn       Y1      Yn resX
#> 1 medium_range.forcing.conus climateR -2303999 2304001 -1920000 1920000 1000
#>   resY ncols nrows
#> 1 1000  4608  3840
#>                                                                                             crs
#> 1 +proj=lcc +lat_0=40 +lon_0=-97 +lat_1=30 +lat_2=60 +x_0=0 +y_0=0 +R=6370000 +units=m +no_defs

Created on 2024-02-06 with reprex v2.0.2

mikejohnson51 commented 8 months ago

attn: @ZacharyWills

JordanLaserGit commented 8 months ago

@mikejohnson51 this is exactly what I was hoping for! will implement this week and i'll get back to you. thank you sir!

JordanLaserGit commented 8 months ago

@mikejohnson51 Thoughts on reformatting the parquet to something like this?

Screenshot 2024-02-07 082516

Motivation for me is the forcingprocessor has an internal loop through catchments. It'll need to do a seek for each catchment key in the current file layout. I can prepare my own file from s3://lynker-spatial/v20.1/forcing_weights.parquet easily if it's preferable to keep the current data layout.

mikejohnson51 commented 8 months ago

Not opposed per say that looks a lot like a JSON rather parquet based on my rudimentary knowledge. I would consider the advantages of the tabular form rather then loop for a few reasons that were the basis for zonal. I can spit out a converted file for testing though!

JordanLaserGit commented 8 months ago

oh it is json! i phrased that poorly. no worries, I can create my own for our conus daily run. main purpose of this was to have standardized weights so it's all good! thanks @mikejohnson51

mikejohnson51 commented 8 months ago

@JordanLaserGit just to elaborate a bit on that top point, I think working from the table can be very fast and I would wager, be a preferred approach to looping. Below is a CONUS regrid on my local laptop, no multi processing, parallelization or all the stuff you guys rock at. If its of interest maybe we can write a execute_zonal equivalent for your python needs?

image

So I imagine with the way you can distribute processing you could rescale CONUS for an 18 hour forecast in a minute over a few processers. Im now out of my depths :)

JordanLaserGit commented 8 months ago

man that's fast! forcingprocessor takes about 40 seconds to create something equivalent to oo . With 24 cores it takes a little over 3 minutes to read in 24hrs worth of nwm forcing data and write the 800k catchment csvs out. This is a small portion of the total datastream runtime, so not a pressing issue, but if zonal is faster, probably makes sense to move away from my python method and towards zonal in the long run.

program-- commented 8 months ago

@JordanLaserGit You may also consider trying to work with pyarrow RecordBatches instead of loading in the entire parquet file if you're not already, since you'll know the divide IDs beforehand (or use polars, since it's better than pandas 😛). For example:

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset

divs = ["cat-1866993", "cat-1866995"]

def print_batches():
    w = pa.dataset.dataset(
        's3://lynker-spatial/v20.1/forcing_weights.parquet', format='parquet'
    ).filter(
        pc.field('divide_id').isin(divs)
    ).to_batches()

    batch: pa.RecordBatch
    for batch in w:
        tbl = batch.to_pandas()
        if tbl.empty:
            continue

        print(tbl)
        #>       divide_id       cell  coverage_fraction                     grid_id
        #> 0   cat-1866993  7632614.0           0.006293  medium_range.forcing.conus
        #> 1   cat-1866993  7632615.0           0.083473  medium_range.forcing.conus
        #> 2   cat-1866993  7637220.0           0.007748  medium_range.forcing.conus
        #> ..          ...        ...                ...                         ...
        #> 43  cat-1866995  7669480.0           0.101241  medium_range.forcing.conus
        #> 44  cat-1866995  7669481.0           0.099398  medium_range.forcing.conus

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("print_batches()", globals=locals(), number=3))
    #> 10.545011819922365

With the pyarrow compute kernel, you can also perform aggregation before reading through the batches IIRC. If I can get an example, I'll edit it onto here.

JordanLaserGit commented 8 months ago

@program-- that's handy! our main application is a conus run so we have to load the whole thing anyway, but this will be helpful when we are subsetting, which we are supporting.

program-- commented 8 months ago

@program-- that's handy! our main application is a conus run so we have to load the whole thing anyway, but this will be helpful when we are subsetting, which we are supporting.

I'd consider it for CONUS too -- you can potentially group batches by ID, then grouped aggregation becomes embarrassingly parallel. Since you're streaming records, the memory footprint is lower too (polars has some handy methods for doing all of this via lazy frames)

JordanLaserGit commented 8 months ago

@program-- i'll give it a try!