Ouranosinc / xclim

Library of derived climate variables, ie climate indicators, based on xarray.
https://xclim.readthedocs.io/en/stable/
Apache License 2.0
326 stars 58 forks source link

Efficient Application of Bias Correction #1255

Open kckornelsen opened 1 year ago

kckornelsen commented 1 year ago

Description

I am trying to use xclim to bias correct precipitation from ECCC's GEPS product (ensemble NWP) and correcting it against CaPA data. The NWP model has 20 ensemble members and 16 lead times in the forecast. The data is applied to N different watersheds (each with a single timeseries and not gridded data). Through some research it was determined that each GEPS lead time has a slightly different bias. The EQM method was selected and parameters for EQM are pre-computed. The current code takes quite a bit of time to run and I would like to know if there are ways to speed it up. Particularly around the nested for loops. Is there a built in xarray function or xclim function?

What I Did

Assume we start with a bit of house keeping, imports, identifying folders, etc.

def process_each_lead_time(leadtime_length,ini_time,geps_fcst,path_eqm_par_folder,path_output_geps,realizations,stations):

    # define the time slot for this leadtime 
    ltime_start = ini_time + np.timedelta64(leadtime_length*i_ltime, 'h')  + np.timedelta64(1,'h')
    ltime_end = ini_time + np.timedelta64(leadtime_length*(i_ltime+1), 'h')

    #select geps dataset within the time slot
    geps_fcst_ltime = geps_fcst.sel(time=slice(ltime_start, ltime_end)).copy(deep=True)

    # read eqm parameters for this lead time 
    eqm_pars = xr.open_dataset(os.path.join(path_eqm_par_folder,"Leadtime_"+str(i_ltime + 1) + "_EQM.nc")) 
    eqm_pars.close()

    # correct geps for each station and realization 
    for i_stations in range(0,len(stations)):
        i_station = stations[i_stations]
        geps_fcst_ltime_station = geps_fcst_ltime.sel(stations=i_station)

        stn = station_names[i_station]
        with open("../../Export/GEPS_FCST2/log.txt", "a") as f:
            print("INFO: processing station", i_station, "or", stn, ",at leadtime ",i_ltime, file=f)
            # print(i_station, file=f)
        for i_ens in range(0,len(realizations)):

            i_en = realizations[i_ens]

            # select parameter for this station and this realization 
            idx = i_stations*22 + i_ens
            EMQ_par = eqm_pars.sel(station_ens = idx)

            geps_ltime_uncorrect = geps_fcst_ltime_station.sel(realization = i_en)["PC_NWP"]

            #create EMQ model based on this parameter
            QM_model = sdba.EmpiricalQuantileMapping.from_dataset(EMQ_par)

            # correct geps forecast 
            geps_ltime_corrected = QM_model.adjust(geps_ltime_uncorrect,extrapolation="constant", interp="linear") 
            geps_ltime_corrected = geps_ltime_corrected.fillna(0)
            geps_ltime_corrected = xr.where((geps_ltime_corrected>1000),0,geps_ltime_corrected)

            # assign forecast to the dataset 
            geps_fcst["PC_NWP_CORR"].loc[dict(time=slice(ltime_start, ltime_end),realization = i_en,stations = i_station)] = geps_ltime_corrected.values

    return geps_fcst

#loop for lead times
for i_ltime in range(0,n_leadtimes):

    start_time = time.time()

    geps_fcst = process_each_lead_time(leadtime_length,ini_time,geps_fcst,path_eqm_par_folder,path_output_geps,realizations,stations)

    print("Total time for each leadtime is ",time.time() -  start_time)

geps_fcst["PC_NWP_CORR"].attrs['long_name'] = 'PC_NWP_CORR'
geps_fcst["PC_NWP_CORR"].attrs['ensemble'] = 'GEPS_CORR'
geps_fcst["PC_NWP_CORR"].attrs['units'] = 'mm'
geps_fcst = geps_fcst.drop_vars("PC_NWP")
geps_fcst.to_netcdf(path_output_geps)

geps_fcst.close()

What I Received

This function works and provides the expected results. With the parallel code modified below it takes ~25 min to run. This process applied to ~150 sites with only a 14 day forecast lead time.

from joblib import Parallel, delayed

output_ncs = Parallel(n_jobs=4)(delayed(process_each_lead_time)(leadtime_length,ini_time,geps_fcst,path_eqm_par_folder,path_output_folder,realizations,stations,i_ltime) for i_ltime in range(0,n_leadtimes))
aulemahal commented 1 year ago

Hi @kckornelsen!

There are multiple levels to answer your question.

1. Vectorize the computation

xclim is based on xarray and it already knows how to apply an operation over a given dimension but at the same time over all the other dimensions. You don't need to explicitly iterate over stations and realization, as long as your calibration data is the same shape as the simulation data (except for the 'time' axis). which it should be if you used xclim to generate it.

All sdba adjustment method will take a group argument that indicates over which dimension is the process applied. In your case, I assume a simple group='time' was passed? Even if you were using the moving window feature (time.dayofyear), the answer below would stay the same.

Here's a rewrite of your code that drops the explicit for loops. I did not test this and I might have overlooked specificities of your data. But I can try to improve the example if you have specific errors. I added comments prefixed with PB:.

def process_each_lead_time(leadtime_length,ini_time,geps_fcst,path_eqm_par_folder,path_output_geps,realizations,stations):

    # define the time slot for this leadtime 
    ltime_start = ini_time + np.timedelta64(leadtime_length*i_ltime, 'h')  + np.timedelta64(1,'h')
    ltime_end = ini_time + np.timedelta64(leadtime_length*(i_ltime+1), 'h')

    #select geps dataset within the time slot
    geps_fcst_ltime = geps_fcst.sel(time=slice(ltime_start, ltime_end)).copy(deep=True)

    # read eqm parameters for this lead time 
    # PB: I am assuming the trained data has the same shape as the simulated data (except for the time dimension of course)
    eqm_pars = xr.open_dataset(
        os.path.join(path_eqm_par_folder,"Leadtime_"+str(i_ltime + 1) + "_EQM.nc"),
    ) 

    geps_uncorrect = geps_fcst_ltime["PC_NWP"]

    #create EMQ model based on this parameter
    QM_model = sdba.EmpiricalQuantileMapping.from_dataset(EMQ_pars)

    # correct geps forecast 
    geps_corrected = QM_model.adjust(geps_uncorrect, extrapolation="constant", interp="linear") 
    geps_corrected = geps_corrected.fillna(0)
    geps_corrected = xr.where((geps_corrected > 1000), 0, geps_corrected)

    # assign forecast to the dataset 
    geps_fcst["PC_NWP_CORR"] = geps_corrected            
    return geps_fcst

The issue could be RAM usage. You said you have 150 stations and 20 members. GEPS is usually 2 weeks long, isn't ? With 3-hourly data, that's 112 timesteps. 112 x 150 x 20 x float64 ~ 2.5 MB. That's extremely small. Xclim has been used on datasets larger than 10 GB, with success.

I also see that your trained data doesn't have the same shape than what I expected. Was it generated with xclim? In that case, the logic is the same.

As you can see, this had the side effect of removing the logging. Hopefully, it will be fast enough and you won't need it?

2. Parallelism

A second answer to your question is : best performance is achieved using dask. xclim was created with large datasets in mind and sdba even more so. We decided to follow the steps of xarray and make dask the core module for parallelism and lazy computation.

Quickly : dask operates transparently with xarray and numpy. The idea is to divide the data into "chunks". Each time an operation is requested on a xarray DataArray, the operation is done in parallel on each chunk. Moreover, the computation is not actually done unless requested. So after a few xarray operations, what you have is a graph of operation and chunks. Upon computation (ds.load() or ds.to_netcdf(...) for example), dask will try to optimize this graph and run the operation in parallel, as efficiently as possible, efficient in speed as well as memory usage.

I invite you to read some tutorials about using xarray and dask (for example this one) and look at the examples in the xclim doc.

In your case, using dask might not even be useful, as the data is so small. I usually recommend using chunks between 10 and 20 MB.

Here's another version using dask:

import dask
from dask.diagnostics import ProgressBar

# PB: set dask's configuration, 4 processes.
# PB: The default method uses as many _threads_ as there are cpu cores on the machine
# PB: This is a bit annoying, but _threads_ are not optimal for some of the operation sdba uses, so we switch to _processes_
dask.config.set(num_workers=4, scheduler='processes')

# PB: For the example, I am chunking the data along the stations dimension.
# PB: Choosing the optimal chunks is not easy. For sdba you should never chunk along "time".
geps_fcst = xr.open_dataset(
    path_to_geps
    chunks={'stations': 1}
)

def process_each_lead_time(leadtime_length,ini_time,geps_fcst,path_eqm_par_folder,path_output_geps,realizations,stations):

    # define the time slot for this leadtime 
    ltime_start = ini_time + np.timedelta64(leadtime_length*i_ltime, 'h')  + np.timedelta64(1,'h')
    ltime_end = ini_time + np.timedelta64(leadtime_length*(i_ltime+1), 'h')

    #select geps dataset within the time slot
    geps_fcst_ltime = geps_fcst.sel(time=slice(ltime_start, ltime_end)).copy(deep=True)

    # read eqm parameters for this lead time 
    # PB: I am assuming the trained data has the same shape as the simulated data (except for the time dimension of course)
    eqm_pars = xr.open_dataset(
        os.path.join(path_eqm_par_folder,"Leadtime_"+str(i_ltime + 1) + "_EQM.nc"),
        chunks={'stations': 1}  # PB: so the chunks fit the ones on "geps".
    ) 

    geps_uncorrect = geps_fcst_ltime["PC_NWP"]

    #create EMQ model based on this parameter
    QM_model = sdba.EmpiricalQuantileMapping.from_dataset(EMQ_pars)

    # correct geps forecast 
    geps_corrected = QM_model.adjust(geps_uncorrect, extrapolation="constant", interp="linear") 
    geps_corrected = geps_corrected.fillna(0)
    geps_corrected = xr.where((geps_corrected > 1000), 0, geps_corrected)

    with ProgressBar():
        # PB: the computation will be done here.
        geps_corrected.load()    

    # assign forecast to the dataset 
    geps_fcst["PC_NWP_CORR"] = geps_corrected            
    return geps_fcst

3. Vectorize along lead times too

If solution 1 works well, you might be able to vectorize along the lead times as well, instead of iterating. I mean we will extract all lead times and stack them over a new dimension before adjusting, and then we'll unstack each lead times. We have helpers in xclim for this kind of data restructuration, but it was meant for applying adjustment daily data on moving multi-year windows, so it might not be ideal. It also uses more xarray magic, so it looses points on the "readability" side.


def extract_ltime(ds, leadtime_length, ini_time, i_ltime)
    # define the time slot for this leadtime 
    ltime_start = ini_time + np.timedelta64(leadtime_length*i_ltime, 'h')  + np.timedelta64(1,'h')
    ltime_end = ini_time + np.timedelta64(leadtime_length*(i_ltime+1), 'h')

    #select within the time slot
    # Add a new dimension with the lead number
    return ds.sel(time=slice(ltime_start, ltime_end)).expand_dims(lead=[i])

# PB: We do this to get a reference time coordinate
geps_ltime_0 = extract_ltime(geps_fcst, leadtime_length, ini_time, 0)

geps_stacked = xr.concat(
    [
        # For each lead time, extract the series, drop the time coordinate and add a new dimension with the lead number
        extract_ltime(geps_fcst, leadtime_length, ini_time, i).drop_vars('time')
        for i in range(n_leadtimes)
    ].
    'lead'
)
# sdba will want a normal time coordinate, so we assign a reference one.
geps_stacked = geps_stacked.assign_coords(time=geps_ltime_0.time)

# read eqm parameters for this lead time 
# PB: I am assuming the trained data has the same shape as the simulated data (except for the time dimension of course)
# PB: Same idea as above, we open each dataset and assign a new "lead time " coord:
eqm_pars = xr.concat(
    [
        xr.open_dataset(
            os.path.join(path_eqm_par_folder,"Leadtime_"+str(i_ltime + 1) + "_EQM.nc")
        ).expand_dims(lead=i_ltime)
        for i_ltime in range(n_leadtimes)
    ],
    'lead'
) 

geps_uncorrect = geps_stacked["PC_NWP"]

#create EMQ model based on this parameter
QM_model = sdba.EmpiricalQuantileMapping.from_dataset(EMQ_pars)

# correct geps forecast 
geps_corrected = QM_model.adjust(geps_uncorrect, extrapolation="constant", interp="linear") 
geps_corrected = geps_corrected.fillna(0)
geps_corrected = xr.where((geps_corrected > 1000), 0, geps_corrected)

# Unstack the lead times into a dictionary, keys will be the lead time as a string:
out = {}
for i_ltime in geps_corrected.lead.values:
    ltime_wrong = geps_corrected.sel(lead=i_ltime)
    ltime_fixed = ltime_wrong.assign_coords(
        time=ltime_wrong.time + np.timedelta64(leadtime_length * i_ltime, 'h')  + np.timedelta64(1,'h')
    )
    timestamp = ltime_fixed.time[0].dt.strftime().item()
    out[timestamp] = ltime_fixed

I know this last suggestion is a bit complicated, but if (1) works well, this might yield better performance than using dask, considering the small size of your data.

dustming commented 1 year ago

Hi @aulemahal @kckornelsen , this is supper interesting~!!!. We obtained EQM parameters for each station and realization one by one and then combined them into one nc file. This is why you see the parameter file has a different structure from the GEPS forecast data.

We want to follow your instruction and apply the code in ( [1] Vectorize the computation). The problem I have is that I am unsure how to properly create an EQM parameter file with the same structure as the forecast data.

@aulemahal Would you please provide me a simple example that creates the EQM parameter across different dimensions?

Thank you in advance~! Ming

PS: The following is the EQM parameter file I have. The dimension "station_ens" is created by unique combination of stations and realizations.

Dimensions:      (quantiles: 20, station_ens: 3255, month: 12)
Coordinates:
  * quantiles    (quantiles) float32 0.025 0.075 0.125 ... 0.875 0.925 0.975
  * month        (month) int64 1 2 3 4 5 6 7 8 9 10 11 12
  * station_ens  (station_ens) int32 0 1 2 3 4 5 ... 3404 3405 3406 3407 3408
Data variables:
    af           (station_ens, month, quantiles) float64 ...
    hist_q       (station_ens, month, quantiles) float64 ...
Attributes:
    group:               time.month
    group_compute_dims:  time
    group_window:        1
    _xclim_adjustment:   {"py/object": "xclim.sdba.adjustment.EmpiricalQuanti...
    adj_params:          EmpiricalQuantileMapping(group=Grouper(name='time.mo...

Here is how I create the EQM parameter file for each leadtime.

    # read in leadtime i time series 
    for i_stations in range(0,155):
        for i_ens in range(0,21):
            # create a unique index for each station and realization 
            idx = i_stations*22 + i_ens

            # select data of current station and realization for GEPS and CAPA
            geps_ltime_i_sel = geps_ltime_i.sel(realization = i_ens,stations=i_stations)
            capa_ltime_i_sel = capa_ltime_i.sel(stations=i_stations)
            # obtain training dataset 
            geps_ltime_i_data = geps_ltime_i_sel['PC_NWP']
            capa_ltime_i_data = capa_ltime_i_sel['PC_cpa']

            # obtain trained EQM parameters 
            QM_mo = sdba.EmpiricalQuantileMapping.train(capa_ltime_i_data ,geps_ltime_i_data ,nquantiles=20, kind='*',group='time.month') 

            # add a new dimension 
            QM_mo_dim = QM_mo.ds.expand_dims("station_ens").assign_coords(station_ens=("station_ens", [idx]))
            # store the parameter for this station and realization into an EQM parameter list 
            par_list.append(QM_mo_dim) 

    # Combine parameters in the EQM parameter list into one xarray 
    eqm_par_leadtime = xr.concat(par_list,"station_ens")
    # save the combined EQM parameter xarray to nc
    eqm_par_leadtime.to_netcdf(os.path.join(path_data_EQM,"Leadtime_"+str(i_ltime + 1) + "_EQM.nc"))
dustming commented 1 year ago

Hi @aulemahal, I modified the existing xclim examples and tested the Vectorize approach in [1]. It seems to have worked. Here is the example code. Would you please let me know if you notice something wrong with this example code? Thanks Ming

t = xr.cftime_range("2000-01-01", "2030-12-31", freq="D", calendar="noleap")
vals = np.random.randint(0, 1000, size=(t.size,2,2)) / 100
vals_ref = (4 ** np.where(vals < 9, vals / 100, vals)) / 3e6
vals_sim = (
    (1 + 0.1 * np.random.random_sample((t.size,2,2)))
    * (4 ** np.where(vals < 9.5, vals / 100, vals))
    / 3e6
)
pr_ref = xr.DataArray(
    vals_ref, coords={"time": t,"realization":[1,2],"lat":('station',[2,3])}, dims=("time","realization","station"), attrs={"units": "mm/day"}
)
pr_ref = pr_ref.sel(time=slice("2000", "2015"))
pr_sim = xr.DataArray(
    vals_sim, coords={"time": t,"realization":[1,2],"lat":('station',[2,3])}, dims=("time","realization","station"), attrs={"units": "mm/day"}
)
pr_hist = pr_sim.sel(time=slice("2000", "2015"))

sim_ad, pth, dP0 = sdba.processing.adapt_freq(
    pr_ref, pr_sim, thresh="0.05 mm d-1", group="time"
)
QM_ad = sdba.EmpiricalQuantileMapping.train(
    pr_ref, sim_ad, nquantiles=15, kind="*", group="time"
)
scen_ad = QM_ad.adjust(pr_sim)