geodesymiami / rsmas_insar

RSMAS InSAR code
https://rsmas-insar.readthedocs.io/
GNU General Public License v3.0
59 stars 23 forks source link

Job submission: add parallel queue plus module for default jobs submission parameters #81

Closed falkamelung closed 3 years ago

falkamelung commented 5 years ago
process_utilities.py:sentinelStack.slcDir                      = auto         # [SLCs dir]
_process_utilities.py:sentinelStack.orbitDir                    = auto         # [$SENTINEL_ORBITS]
_process_utilities.py:sentinelStack.auxDir                      = auto         # [$SENTINEL_AUX]
_process_utilities.py:sentinelStack.demDir                      = auto         # [DEM file dir]
_process_utilities.py:sentinelStack.master                      = auto         # [Master acquisition]

or:

_process_utilities.py:         TemplateTuple('sentinelStack.master', 'masterDir', None),
_process_utilities.py:         TemplateTuple('sentinelStack.numConnections', 'numConnections', 3),
_process_utilities.py:         TemplateTuple('sentinelStack.numOverlapConnections', 'numOverlapConnections', 3),
_process_utilities.py:         TemplateTuple('sentinelStack.textCmd', 'textCmd', '\'\''),
_process_utilities.py:         TemplateTuple('sentinelStack.excludeDate', 'excludeDate', None),
_process_utilities.py:         TemplateTuple('sentinelStack.includeDate', 'includeDate', None),
_process_utilities.py:         TemplateTuple('sentinelStack.azimuthLooks', 'azimuthLooks', 3),
_process_utilities.py:         TemplateTuple('sentinelStack.rangeLooks', 'rangeLooks', 9),

or:

//login3/nethome/famelung/test/test2/rsmas_insar/sources/PySAR[1049] grep pysar.network */*py
pysar/ifgram_inversion.py:key_prefix = 'pysar.networkInversion.'
pysar/ifgram_inversion.py:pysar.networkInversion.weightFunc      = auto #[var / fim / coh / no], auto for var
pysar/ifgram_inversion.py:pysar.networkInversion.maskDataset     = auto #[coherence / connectComponent / no], auto for no
pysar/ifgram_inversion.py:pysar.networkInversion.maskThreshold   = auto #[0-1], auto for 0.4

import os import re import glob import numpy as np

Auto setting for file structure of Univ. of Miami, as shown below.

It required 3 conditions: 1) autoPath = True

2) $SCRATCHDIR is defined in environmental variable

3) input custom template with basename same as project_name

Change it to False if you are not using it.

autoPath = True

Default path of data files from different InSAR processors to be loaded into PySAR

isceAutoPath = '''##----------Default file path of ISCE-topsStack products pysar.load.processor = isce pysar.load.metaFile = ${PROJECT_DIR}/master/IW*.xml pysar.load.baselineDir = ${PROJECT_DIR}/baselines

pysar.load.unwFile = ${PROJECT_DIR}/merged/interferograms//filt.unw pysar.load.corFile = ${PROJECT_DIR}/merged/interferograms//filt.cor pysar.load.connCompFile = ${PROJECT_DIR}/merged/interferograms//filt.unw.conncomp pysar.load.ionoFile = None pysar.load.intFile = None

falkamelung commented 5 years ago

The memory assignment is currently done by a function in execute_stacksentinel_runfiles.py in teh lines below. In a similar way we would like to have the ability to set the walltimelimit. Most run files will have the same default walltimelimit but there are a few runfiles that need longer walltimelimit. In the same way we want the ability to set other job parameters as needed. (Looking forward: we may want the ability to set these parameters for different platforms and queues. So this default function should be ready to understand platform, queue variables.)

*Important: we need for all these parameters the ability to overwrite defaults by setting the option in the `.template file`.**

def set_memory_defaults():
    """ Sets an optimized memory value for each job. """

    memoryuse = {'unpack_slc_topo_master':'3700',
                 'average_baseline':'3700',
                 'extract_burst_overlaps':'3700',
                 'overlap_geo2rdr_resample':'4000',
                 'pairs_misreg':'3700',
                 'timeseries_misreg':'3700',
                 'geo2rdr_resample':'5000',
                 'extract_stack_valid_region':'3700',
                 'merge':'3700',
                 'merge_burst_igram': '3700',
                 'grid_baseline':'3700',
                 'generate_igram':'3700',
                 'filter_coherence':'6000',
                 'merge_master_slave_slc':'3700',
                 'unwrap':'3700'}

    return memoryuse
lily-wittle commented 5 years ago

@falkamelung I added support for the parallel queue in create_batch.py (using it if a queue is not given) and tried moving the job submission defaults to a separate file. I followed Yunjun's example from process_isce_stack.py in PySAR. I wasn't sure where you would want the .cfg file with defaults, but I can move it wherever. In create_batch.py, there were only the two defaults set for walltime and memory, but I can add more specific ones for different jobs if that is what we want. What I currently have is on https://github.com/geodesymiami/rsmas_insar/tree/job_submission_defaults

falkamelung commented 5 years ago

Great! I am glad we are making process with this. Will there a way to set different defaults for different platforms, schedulers and queues?

We sort-of came to the conclusion that it may be better to use the pyyaml module which is used by Dask (see below), but I am not sure. Sara @mirzaees may have worked on this, but I don’t know how far she came. Please also look at run_operations for Josh’s idea how to use environment variables. Really beautiful code!

Finally, we were considering of renaming create_batch to create_batch_jobs, submit_job_list.py or submit_batch_jpbs. Any suggestion of what would be appropriate, given the current function naming?

platform: pegasus
  scheduler:  lsf
      queue: general
          default: 
               memory: XX
               walltime: YYY
          run_1_topo_unpack:
                memory: XX
                walltime: YY
         run_8_unwrap:
      …
      queue: parallel

platform: atmosphere:
    scheduler: slurm
      queue: general
        SCRATCH_DIRECTORY = os.getenv('SCRATCHDIR')
        TEMPLATE_DIRECTORY = os.path.join(OPERATIONS_DIRECTORY, "TEMPLATES")
        LOGS_DIRECTORY = os.path.join(OPERATIONS_DIRECTORY, "LOGS")
        ERRORS_DIRECTORY = os.path.join(OPERATIONS_DIRECTORY, "ERRORS")
        STORED_DATE_FILE = os.path.join(OPERATIONS_DIRECTORY, "stored_date.date")

        DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
//login4/nethome/famelung/test/test2/rsmas_insar/sources/PySAR/pysar[1106] cat ./defaults/dask_pysar.yaml
jobqueue:
  lsf:
    queue: general
    project: insarlab
    name: pysar_bee

    # Dask worker options
    cores: 2                    # Total number of cores per job
    memory: 2GB                 # This parameter is ignored by Pegasus
    processes: 1                # Number of Python processes per job

    interface: ib0              # Network interface to use like eth0 or ib0
                                # ESSENTIAL PARAMETER for performance

    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: null       # Location of fast local storage like /scratch or $TMPDIR

    # LSF resource manager options
    shebang: "#!/usr/bin/env bash"
    walltime: '00:30'
    extra: []
    env-extra: []
    ncpus: null
    mem: null
    # The first parameter is required by Pegasus. This parameter sets the
    # memory per node. The second parameter writes worker output to file.
    job-extra: ['-R "rusage[mem=2500]"',
                "-o worker_pysar.%J.o"]
    log-directory: null

########################################################
  ifgram_inversion:
    queue: general
    project: insarlab
    name: ifginv_bee

    # Dask worker options
    cores: 2                    # Total number of cores per job
    memory: 2GB                 # This parameter is ignored by Pegasus
    processes: 1                # Number of Python processes per job

    interface: ib0              # Network interface to use like eth0 or ib0
                                # ESSENTIAL PARAMETER for performance

    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: null       # Location of fast local storage like /scratch or $TMPDIR

    # LSF resource manager options
    shebang: "#!/usr/bin/env bash"
    walltime: '00:30'
    extra: []
    env-extra: []
    ncpus: null
    mem: null
    # The first parameter is required by Pegasus. This parameter sets the
    # memory per node. The second parameter writes worker output to file.
    job-extra: ['-R "rusage[mem=6400]"',
                "-o worker_ifginv.%J.o"]
    log-directory: null

########################################################

  pbs:
    name: dask-worker

    # Dask worker options
    cores: null                 # Total number of cores per job
    memory: null                # Total amount of memory per job
    processes: 1                # Number of Python processes per job

    interface: null             # Network interface to use like eth0 or ib0
    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: null       # Location of fast local storage like /scratch or $TMPDIR

    # PBS resource manager options
    shebang: "#!/usr/bin/env bash"
    queue: null
    project: null
    walltime: '00:30:00'
    extra: []
    env-extra: []
    resource-spec: null
    job-extra: []
    log-directory: null
lily-wittle commented 5 years ago

@falkamelung I made some changes to use the pyyaml module instead of configparser -- both versions are in commits on https://github.com/geodesymiami/rsmas_insar/tree/job_submission_defaults

I wrote it with two different schedulers, LSF and PBS, but it would be easy to add more to the config file. I wasn't sure where the platform would be set; I saw that platforms.bash uses the environment variable $HOST, but that this is set to login4 instead of pegasus, for example, so I didn't know if it was exactly the same. At any rate, you could add another section to the yaml for platform (like in the first example you posted) if you wanted different default values for different platforms.

As for renaming create_batch.py, it also has functions for submitting a single job (including a script like process_rsmas or download_rsmas) so I think maybe something like job_submission.py might be more appropriate?

falkamelung commented 5 years ago

@lily-wittle I think we should have an environment variable PLATFORM

It requires a minor modification in platforms.bash. I could commit this right now. Maybe just assume it exists.

echo "sourcing $PWD/bashfiles/platforms.bash ..."

if [[ (${HOST} == login3) || (${HOST} == login4) || (${HOST} =~ vis) ]]
then
  export PLATFORM=pegasus
  export JOBSCHEDULER=LSF
lily-wittle commented 5 years ago

@falkamelung that sounds like a good idea. I added support for platforms to the job_submission_defaults branch.

Also just a note on a possible bug: I might've done something wrong, but I reinstalled a new version of the code yesterday and found that after sourcing all the bash files, $HOST was not set (and therefore $JOBSCHEDULER, etc. were not set). I was able to manually export HOST=login4 but not sure where this was supposed to be set and if it will be an issue in the future.