Parsl / parsl

Parsl - a Python parallel scripting library
http://parsl-project.org
Apache License 2.0
470 stars 194 forks source link

App-specific site requirements #224

Open kylechard opened 6 years ago

kylechard commented 6 years ago

Many apps have specific resource requirements. At present the only way to represent these requirements is through a collection of different sites. Ideally, we should provide an app catalog that can be used to specify these requirements independent of the site definitions.

This is an example from Joe Zuntz of what he would like to do:

# would like to run with at most this many nodes:
max_nodes = 20

# and the jobs require this many nodes and cores each:
nodes_and_cores_for_apps = {
    'app1': (4, 1),
    'app2': (1, 32),
    'app3': (8, 4),
    'app4': (8, 32),
}

@App('bash', dfk)
def app1(outputs=[]):
    return """
    mpirun -n 4 $SCRIPTS/task1 --output {outputs[0]}
    """

@App('bash', dfk)
def app2(inputs=[], outputs=[]):
    return """
    mpirun -n 32 $SCRIPTS/task2 --input {inputs[0]} --output {outputs[0]}
    """

@App('bash', dfk)
def app3(inputs=[], outputs=[]):
    return """
    mpirun -n 32 $SCRIPTS/task3 --input {inputs[0]} --output {outputs[0]}
    """

@App('bash', dfk)
def app4(inputs=[], outputs=[]):
    return """
    mpirun -n 128 $SCRIPTS/task3 --input {inputs[0]} --output  {outputs[0]}
    """

future1 = app1(outputs=['1.txt'])
future2 = app2(inputs=future1.outputs, outputs=['2.txt'])
future3 = app3(inputs=future2.outputs, outputs=['3.txt'])
future4 = app3(inputs=future3.outputs, outputs=['4.txt'])

# can't recally 
results4.result()
djf604 commented 6 years ago

I can't upvote this enough.

annawoodard commented 6 years ago

The fundamental issue here is that we need a way to specify app-specific requirements. That is a critical problem that I think should be high priority. I'm not convinced that restricting ourselves to an app catalogue is the way to go though. These requirements could also be passed as parameters to the decorator. I do understand the desire to have code and execution separated, but parameters to the decorator would be more straightforward for users in many cases. One possibility would be to have all knobs be optional parameters to the decorator which could be overridden by a catalogue. Note we've had a similar discussion with how the number of retries should be specified.

annawoodard commented 6 years ago

Note this issue is related to #107.

joezuntz commented 6 years ago

For my specific use-case we are programmatically generating apps so it would be easy to specify this in the decorator. I don't know what would work for people more generally of course.

yadudoc commented 6 years ago

@joezuntz I've sent you these same scripts over slack. Let me know if this gets us closer to a workable solution for you. There are two files, test.py which programmatically creates a few "sites" and cori.py which contains base templates for a Cori configuration

test.py

from parsl import *
import os
import copy
set_stream_logger()

# Catalog where you capture requirements of each app
app_catalog = {
    'app1': {"nodes" : 4,
             "cores_per_node" : 1},
    'app2': {"nodes" : 1,
             "cores_per_node" : 32},
    'app3': {"nodes" : 8,
             "cores_per_node" : 4},
    'app4': {"nodes" : 8,
             "cores_per_node" : 32},
}

# Get the Cori config template
from cori import coriBase
config = { "sites" : [],
           "globals": {"lazyErrors": True}
       }

# We construct site definitions to match app requirements.
# for each of our apps. Our apps only differ in the number
# of nodes at the level of jobs requested from the scheduler
for app in app_catalog:

    # Do not recreate site if we've already made a comparable definition
    sitename = "Cori_{}N".format(app_catalog[app]["nodes"])
    app_catalog[app]["site"] = sitename

    if sitename in [sitedef["site"] for sitedef in config["sites"]]:
        continue
    tempsite = copy.deepcopy(coriBase)
    tempsite["site"] = sitename
    tempsite["execution"]["block"]["nodes"] = app_catalog[app]["nodes"]
    config["sites"].append(tempsite)

dfk = DataFlowKernel(config=config)

@App('bash', dfk, sites=[app_catalog['app1']["site"]])
def app1(nodes=app_catalog['app1']["nodes"], 
         cores_per_node=app_catalog['app1']["cores_per_node"], 
         scripts=None, outputs=[]):
    return """
    echo "srun --nodes={nodes} --ntasks={cores_per_node} {scripts}/mpi_hello" &>> {outputs[0]}
    srun --nodes={nodes} --ntasks=%s {scripts}/mpi_hello &>> {outputs[0]}
    """ % (nodes * cores_per_node)

@App('bash', dfk, sites=[app_catalog['app2']["site"]])
def app2(nodes=app_catalog['app2']["nodes"], 
         cores_per_node=app_catalog['app2']["cores_per_node"], 
         scripts=None, inputs=[], outputs=[]):
    return """
    echo "srun --nodes={nodes} --ntasks={cores_per_node} {scripts}/mpi_hello" &>> {outputs[0]}
    srun --nodes={nodes} --ntasks=%s {scripts}/mpi_hello &>> {outputs[0]}
    """% (nodes * cores_per_node)

@App('bash', dfk, sites=[app_catalog['app3']["site"]])
def app3(nodes=app_catalog['app3']["nodes"], 
         cores_per_node=app_catalog['app3']["cores_per_node"], 
         scripts=None, inputs=[], outputs=[]):
    return """
    echo "srun --nodes={nodes} --ntasks={cores_per_node} {scripts}/mpi_hello" &>> {outputs[0]}
    srun --nodes={nodes} --ntasks=%s {scripts}/mpi_hello &>> {outputs[0]}
    """% (nodes * cores_per_node)

@App('bash', dfk, sites=[app_catalog['app4']["site"]])
def app4(nodes=app_catalog['app4']["nodes"], 
         cores_per_node=app_catalog['app4']["cores_per_node"], 
         scripts=None, inputs=[], outputs=[]):
    return """
    echo "srun --nodes={nodes} --ntasks={cores_per_node} {scripts}/mpi_hello" &>> {outputs[0]}
    srun --nodes={nodes} --ntasks=%s {scripts}/mpi_hello &>> {outputs[0]}
    """% (nodes * cores_per_node)

scripts = '/global/homes/y/yadunand/mpi_apps'

future1 = app1(scripts=scripts, outputs=['1.txt'])
future2 = app2(scripts=scripts, inputs=future1.outputs, outputs=['2.txt'])
future3 = app3(scripts=scripts, inputs=future2.outputs, outputs=['3.txt'])
future4 = app4(scripts=scripts, inputs=future3.outputs, outputs=['4.txt'])

future4.result()

cori.py

import getpass
USERNAME = getpass.getuser()

# This is the template config for Cori that is modified on a per app basis.
coriBase =  {  "site": "Cori_Template",
               "auth": {
                   "channel": "local",
                   "hostname": "cori.nersc.gov",
                   "username": USERNAME,
                   "scriptDir": "/global/homes/y/{}/parsl_scripts".format(USERNAME),
               },
               "execution": {
                   "executor": "ipp",
                   "provider": "slurm", 
                   "block": {  # Definition of a block                                                         
                       "nodes": 1,            # of nodes in that block                                         
                       "taskBlocks": 1,       # total tasks in a block                                         
                       "walltime": "00:10:00",
                       "initBlocks": 0,
                       "maxBlocks": 1,
                       "options": {
                           "partition": "debug",
                           # Remember to update this if you are using a different Python version 
                           # on client side. Client and workers should have the same python env.
                           "overrides": """#SBATCH --constraint=haswell                                        
                           module load python/3.5-anaconda ; source activate parsl_env_3.5"""
                       }
                   }
               }
           }

# A full fledged config, that has a few different pools of resources.
coriConfig = {
    "sites": [
        {  "site": "Cori_2N",
           "auth": {
               "channel": "local",
               "hostname": "cori.nersc.gov",
               "username": USERNAME,
               "scriptDir": "/global/homes/y/{}/parsl_scripts".format(USERNAME),
           },
           "execution": {
               "executor": "ipp",
               "provider": "slurm",
               "block": {  # Definition of a block                                                         
                   #"launcher": "singlenode",
                   "nodes": 2,            # of nodes in that block                                         
                   "taskBlocks": 1,       # total tasks in a block                                         
                   "walltime": "00:10:00",
                   "initBlocks": 0,
                   "maxBlocks": 1,
                   "options": {
                       "partition": "debug",
                       "overrides": """#SBATCH --constraint=haswell                                        
                       module load python/3.5-anaconda ; source activate parsl_env_3.5"""
                   }
               }
           }
         },
         {  "site": "Cori_1N",
            "auth": {
                "channel": "local",
                "hostname": "cori.nersc.gov",
                "username": USERNAME,
                "scriptDir": "/global/homes/y/{}/parsl_scripts".format(USERNAME),
            },
            "execution": {
                "executor": "ipp",
                "provider": "slurm",
                "block": {  # Definition of a block                                                         
                    #"launcher": "srun",
                    "nodes": 1,            # of nodes in that block                                         
                    "taskBlocks": 1,       # total tasks in a block                                         
                    "walltime": "00:10:00",
                    "initBlocks": 0,
                    "maxBlocks": 1,
                    "options": {
                        "partition": "debug",
                        "overrides": """#SBATCH --constraint=haswell                                        
                        module load python/3.5-anaconda ; source activate parsl_env_3.5"""
                    }
                }
            }
        },
        {  "site": "Cori_8N",
            "auth": {
                "channel": "local",
                "hostname": "cori.nersc.gov",
                "username": USERNAME,
                "scriptDir": "/global/homes/y/{}/parsl_scripts".format(USERNAME),
            },
            "execution": {
                "executor": "ipp",
                "provider": "slurm",
                "block": {  # Definition of a block                                                         
                    "launcher": "srun",
                    "nodes": 8,            # of nodes in that block                                         
                    "taskBlocks": 1,       # total tasks in a block                                         
                    "walltime": "00:10:00",
                    "initBlocks": 0,
                    "maxBlocks": 1,
                    "options": {
                        "partition": "debug",
                        "overrides": """#SBATCH --constraint=haswell                                        
                        module load python/3.5-anaconda ; source activate parsl_env_3.5"""
                    }
                }
            }
        }

    ],
    "globals": {"lazyErrors": True}
}
joezuntz commented 6 years ago

I've adapted the code that @yadudoc posted above and it's worked well - I now generate sites dynamically from the list of tasks and requirements. I refactored it slightly from an app catalog since we are already programmatically generating our apps, but the basic idea is the same.

The only thing I'm not sure about is the wall time behaviour. If a slurm job runs out of time will parsl start another one (presumably not if we're in the middle of a job when it died?). Or should I specify wall times that are long enough for all the jobs on that site?

yadudoc commented 6 years ago

@joezuntz For this case we need to set retries > 1, so that jobs which fail from workers hitting walltime will be re-launched. Whenever there is more work than workers, parsl will try to get more compute from the site. I'd recommend you take a quick look at execution. Setting parallelism == 1, makes sense for long running jobs like yours.

If you don't mind could you link to your code here ? I'm guessing this pattern would be useful in general.