NOAA-EMC / global-workflow

Global Superstructure/Workflow supporting the Global Forecast System (GFS)
https://global-workflow.readthedocs.io/en/latest
GNU Lesser General Public License v3.0
75 stars 168 forks source link

Stage initial conditions stored on HPSS #2988

Open aerorahul opened 2 weeks ago

aerorahul commented 2 weeks ago

What new functionality do you need?

Most jobs require the initial conditions to be available on local disk. The stage_ic job copies/stages these initial condition into the experiment's COM directory. This issue extends that functionality to copy from HPSS (on HPSS accessible machines) into COM.

What are the requirements for the new functionality?

The requirements for this functionality lies in 2 spaces:

Acceptance Criteria

Suggest a solution (optional)

No response

aerorahul commented 2 weeks ago

An example yaml extending stage_atm_cold.yaml

untar:
    tarball : "{{ ATARDIR }}/{{ cycle_YMDH }}/atm_cold.tar"
    on_hpss: True
    contents:
        - gfs_ctrl.nc
        {% for ftype in ["gfs_data", "sfc_data"] %}
        {% for ntile in range(1, ntiles + 1) %}
        - {{ ftype }}.tile{{ ntile }}.nc
        {% endfor %} # ntile
        {% endfor %} # ftype
    destination: "{{ DATA }}"
atmosphere_cold:
    copy:
        - ["{{ DATA }}/gfs_ctrl.nc", "{{ COMOUT_ATMOS_INPUT }}"]
        {% for ftype in ["gfs_data", "sfc_data"] %}
        {% for ntile in range(1, ntiles + 1) %}
        - ["{{ DATA }}/{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_INPUT }}"]
        {% endfor %} # ntile
        {% endfor %} # ftype
DavidHuber-NOAA commented 2 weeks ago

Here is are example scripts and a stub fetch.py module. To run this, modify the settings in fetch.sh and either launch it directly (./fetch.sh) or submit it to Slurm (sbatch fetch.sh).

After copying all of these files into a directory, make a subdirectory named fetch and place @aerorahul's YAML in it. Next, clone wxflow into the directory: git clone git@github.com:NOAA-EMC/wxflow.

fetch.sh

#!/usr/bin/env bash

#SBATCH -A fv3-cpu
#SBATCH -t 06:00:00
#SBATCH -p service
#SBATCH -n 1
#SBATCH -J fetch_from_hpss
#SBATCH -o fetch.out
#SBATCH --open-mode truncate

### ADJUST THESE SETTINGS FOR YOUR CASE ###
# Get the location where this script exists
script_dir="/scratch1/NCEPDEV/global/David.Huber/fetch"  # Change to your local directory

# Set the name of the yaml we will be working with
export fetch_yaml="stage_atm_cold.yaml"

# Set the directory on HPSS where data will be pulled from
export ATARDIR="/NCEPDEV/emc-global/1year/${USER}/test_data"

# Date (YYYYMMDDHH) for which the data is valid
export CDATE=2024100100

##########

# Load modules
module use /scratch1/NCEPDEV/nems/role.epic/spack-stack/spack-stack-1.6.0/envs/gsi-addon-dev-rocky8/install/modulefiles/Core
module load stack-intel stack-python
module load py-pyyaml
module load py-jinja2
module load py-python-dateutil
module load hpss

# Set job variables
export jobid="fetch.$$"
export pslot="fetch_test"
export COMROOT="/scratch1/NCEPDEV/stmp2/${USER}/COMROOT"
export ROTDIR="${COMROOT}/${pslot}"
export DATAROOT="/scratch1/NCEPDEV/stmp2/${USER}/RUNDIRS/${pslot}"
export DATA="${DATAROOT}/${jobid}"
export RUN="gdas"
export NET="gfs"
export SDATE=${CDATE}
export EDATE=${CDATE}
export PDY=$( echo ${CDATE} | cut -c1-8 )
export cyc=$( echo ${CDATE} | cut -c9-10 )
export cycle="t${cyc}z"
export RUN_ENVIR=emc
export assim_freq=6
export ntiles=6

export HOMEgfs="${script_dir}"
export PARMgfs="${script_dir}"

# Extend the PYTHONPATH into wxflow
wxflowPATH="${HOMEgfs}/wxflow/src"
PYTHONPATH="${PYTHONPATH}:${script_dir}:${wxflowPATH}"
export PYTHONPATH

declare -rx COMOUT_ATMOS_INPUT="${ROTDIR}/${RUN}.${PDY}/${cyc}/model/atmos/input"

# Now call exglobal_fetch.py
./exglobal_fetch.py

exglobal_fetch.py

#!/usr/bin/env python3

import os

from fetch import Fetch
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit

# initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)

@logit(logger)
def main():

    config = cast_strdict_as_dtypedict(os.environ)

    # Instantiate the Fetch object
    fetch = Fetch(config)

    # Pull out all the configuration keys needed to run the fetch step
    keys = ['current_cycle', 'RUN', 'PDY', 'PARMgfs', 'PSLOT', 'ROTDIR', 'fetch_yaml', 'ATARDIR', 'ntiles']

    fetch_dict = AttrDict()
    for key in keys:
        fetch_dict[key] = fetch.task_config.get(key)
        if fetch_dict[key] is None:
            print(f"Warning: key ({key}) not found in task_config!")

    # Also import all COMOUT* directory and template variables
    for key in fetch.task_config.keys():
        if key.startswith("COMOUT_"):
            fetch_dict[key] = fetch.task_config.get(key)
            if fetch_dict[key] is None:
                print(f"Warning: key ({key}) not found in task_config!")

    # Determine which archives to retrieve from HPSS
    # Read the input YAML file to get the list of tarballs on tape
    atardir_set = fetch.configure(fetch_dict)

    # Pull the data from tape or locally and store the specified destination
    fetch.execute_pull_data(atardir_set)

if __name__ == '__main__':
    main()

Stub of fetch.py

#!/usr/bin/env python3

import os
from logging import getLogger
from typing import Any, Dict, List

from wxflow import (AttrDict, FileHandler, Hsi, Task,
                    logit, parse_j2yaml)

logger = getLogger(__name__.split('.')[-1])

class Fetch(Task):
    """Task to pull ROTDIR data from HPSS (or locally)
    """

    @logit(logger, name="Fetch")
    def __init__(self, config: Dict[str, Any]) -> None:
        """Constructor for the Fetch task
        The constructor is responsible for collecting necessary yamls based on
        the runtime options and RUN.

        Parameters
        ----------
        config : Dict[str, Any]
            Incoming configuration for the task from the environment

        Returns
        -------
        None
        """
        super().__init__(config)

        # Perhaps add other stuff to self.

    @logit(logger)
    def configure(self, fetch_dict: Dict[str, Any]):
        """Determine which tarballs will need to be extracted.

        Parameters
        ----------
        fetch_dict : Dict[str, Any]
            Task specific keys, e.g. COM directories, etc

        Return
        ------
        ?: Dict[str, Any]
            Dictionary derived from the yaml file with necessary HPSS info.
        """

        self.hsi = Hsi()

        fetch_yaml = fetch_dict.fetch_yaml
        fetch_parm = os.path.join(fetch_dict.PARMgfs, "fetch")

        parsed_fetch = parse_j2yaml(os.path.join(fetch_parm, fetch_yaml),
                                    fetch_dict)

        return {}  # Needed parameters from parsed_fetch

    @logit(logger)
    def execute_pull_data(self, atardir_set: Dict[str, Any]) -> None:
        """Pull data from HPSS based on a yaml dict.

        Parameters
        ----------
        atardir_set: Dict[str, Any]
            Dict defining set of tarballs to pull and where to put them.

        Return
        ------
        None
        """

        # Pull the data and place it where it needs to go

    # Other helper methods...
DavidHuber-NOAA commented 2 weeks ago

@DavidGrumm-NOAA I updated the sample scripts above after testing. The script can now be submitted to slurm via sbatch as well.