flux-framework / flux-core

core services for the Flux resource management framework
GNU Lesser General Public License v3.0
167 stars 50 forks source link

Converting a CLI flux workflow to being Python based #4325

Closed rcarson3 closed 10 months ago

rcarson3 commented 2 years ago

Hi so I'm working on a python workflow as part of the ECP ExaAM project that's making use of DEAP to optimize a set of parameters based on "experimental" observations. Currently, this workflow is done in serial manner where we make use of Python's subprocess call to run 'ExaConstit' on N number of processors. In the past, I've worked with your team to create a Flux-based workflow for a major component of the ExaAM workflow that made use of the CLI side of things. I found that that workflow worked fairly well on Lassen, Summit, and at some point I hope to test it on Crusher or one of LLNL's EA systems.

My general workflow there looked something like:

Job submission script submitted to LSF

# Bunch of stuff here:
PMIX_MCA_gds="^ds12,ds21" jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${NUM_NODES} --bind=none --smpiargs="-disable_gpu_hooks" flux start -o,-Slog-filename=out ./job_batch.sh

where job_batch.sh looked something like:

#! /bin/bash

echo ${FLUX_URI}

SCRIPT=$(readlink -f "$0")
BASE_DIR=$(dirname "$SCRIPT")

cd ${BASE_DIR}/runs/some_location/
#Below is something on the scale that we'd use for the full scale simulation
#we'd run for the challenge problem
flux mini batch --nslots=6 --cores-per-slot=40 --nodes=6 ./hip_mechanics.flux
# cd and submit all of our other locations

cd ${BASE_DIR}
flux jobs -a
flux queue drain
flux jobs -a

where hip_mechanics.flux looked something like:

#! /bin/bash

echo ${FLUX_URI}

flux resource list
#Below is something on the scale that we'd use for the full scale simulation
#we'd run for the challenge problem
flux mini run -N 6 -n 24 -g 1 -o gpu-affinity=per-task -o mpi=spectrum ./mechanics -opt voce_fepx.toml
flux resource list

I'm not too sure how best to replicate most of this in Python. I found https://github.com/flux-framework/flux-workflow-examples/blob/master/job-submit-wait/submitter_wait_in_order.py which looks pretty useful in that would probably allow me to do something like the job_batch.sh portion of things I'm guessing? It looks like I can change the working directory for each job submission as well using something like compute_jobspec.cwd = os.getcwd() portion of things right? If so I'm guessing that would help with how I cd into the various job directories in my original script and then submit a job.

However, it's not clear to me what is expected as JobspecV1.from_command in order to replicate the hip_mechanics.flux portion of things that I was doing earlier.

So, I would very much appreciate any guidance your team might have on the manner.

jameshcorbett commented 2 years ago

I went ahead and mocked up Python equivalents for your scripts. Here's job_batch.sh, which it sounds like you had already figured out (yeah you can set the working directory with the .cwd attribute):

import os

import flux
import flux.job

def main():
    print(os.environ["FLUX_URI"])
    base_dir = os.path.realpath(__file__)
    fh = flux.Flux()
    directories = [
        os.path.join(base_dir, "runs", "some_location"),
    ]  # fill this list with all the directories you want to submit in
    jobids = []
    for dirname in directories:
        jobspec = flux.job.JobSpecV1.from_nest_command(
            [os.path.join(dirname, "hip_mechanics.flux")],
            num_slots=6,
            cores_per_slot=40,
            num_nodes=6,
        )
        jobspec.cwd = dirname
        jobspec.stdout = "/path/to/some/file"  # TODO: fill in
        jobspec.stderr = "/path/to/some/file"
        jobids.append(flux.job.submit(fh, jobspec, waitable=True))
    print(flux.job.job_list(fh))
    for j in jobids:
        flux.job.wait(fh, j)
    print(flux.job.job_list(fh))

if __name__ == '__main__':
    main()

And here is hip_mechanics.flux:

import os

import flux
import flux.job
import flux.resource

def main():
    print(os.environ["FLUX_URI"])
    fh = flux.Flux()
    print(flux.resource.resource_list(fh))
    jobspec = flux.job.JobSpecV1.from_command(
        ["./mechanics", "-opt", "voce_fepx.toml"],
        num_nodes=6,
        num_tasks=24,
        gpus_per_task=1
    )
    jobspec.setattr_shell_option("mpi", "spectrum")
    jobspec.setattr_shell_option("gpu-affinity", "per-task")
    jobspec.stdout = "/path/to/some/file"  # TODO: fill in
    jobspec.stderr = "/path/to/some/file"
    flux.job.wait(fh, flux.job.submit(jobspec, waitable=True))
    print(flux.resource.resource_list(fh))

if __name__ == '__main__':
    main()

I didn't actually run these scripts, so it could be I missed something minor, but I think they do everything your shell scripts do. However, some of the print statements might be a bit ugly compared to the CLI, because you'll be printing out the unformatted data (e.g. from resource_list) that the CLI pretty-prints.

Let me know if you have any questions.

jameshcorbett commented 2 years ago

By the way, is there a reason that job_batch doesn't look like this:


#! /bin/bash

echo ${FLUX_URI}

SCRIPT=$(readlink -f "$0")
BASE_DIR=$(dirname "$SCRIPT")

cd ${BASE_DIR}/runs/some_location/
#Below is something on the scale that we'd use for the full scale simulation
#we'd run for the challenge problem
flux mini submit -N 6 -n 24 -g 1 -o gpu-affinity=per-task -o mpi=spectrum ./mechanics -opt voce_fepx.toml
# cd and submit all of our other locations

cd ${BASE_DIR}
flux jobs -a
flux queue drain
flux jobs -a

All I did was cut out hip_mechanics.flux and place it directly into job_batch.sh. I think it ends up being the same functionality (aside from some print statements) and I think it's a bit simpler.

rcarson3 commented 2 years ago

@jameshcorbett thanks so much for this! This is perfect and should allow me to get a working prototype up and running quite quickly.

flux mini submit -N 6 -n 24 -g 1 -o gpu-affinity=per-task -o mpi=spectrum ./mechanics -opt voce_fepx.toml

I actually wasn't aware of this way of doing things. It would definitely simplify a number of things, and I'll definitely try it out in my main workflow 😃

rcarson3 commented 2 years ago

@jameshcorbett so I'm currently testing my workflow / optimization tool on Summit and am running into some issues on the flux side of things. So, my python flux driver looks like the below:

import os
import os.path
import subprocess
import sys
import re
import numpy as np

import flux
import flux.job

from ExaConstit_Logger import write_ExaProb_log

def map_custom(problem, igeneration, genes):

    status = []
    f_objective = []
    fh = flux.Flux()

    # Preprocess all of the genes first
    for igene, gene in enumerate(genes):
        problem.preprocess(gene, igeneration, igene)

    # Submit all of the flux jobs
    jobids = []
    for igene, gene in enumerate(genes):
        istatus = []
        for iobj in range(problem.n_obj):
            gene_dir = "gen_" + str(igeneration)
            fdir = os.path.join(problem.workflow_dir, gene_dir, "")
            rve_name = "gene_" + str(igene) + "_obj_" + str(iobj)
            fdironl = os.path.join(fdir, rve_name, "")

            flux_obj = problem.job_script
            fh_output = os.path.join(fdironl, "optimization_out.txt")
            fh_error  = os.path.join(fdironl, "optimization_err.txt")
            modifiers = {"binary":problem.bin_mechanics, "nnodes":problem.nnodes, "ntasks":problem.ncpus, "ngpus":problem.ngpus,
            "output_name":fh_output, "error_name":fh_error}
            for iheader, repl_val in modifiers.items():
                search = "%%" + iheader + "%%"
                flux_obj = re.sub(search, str(repl_val), flux_obj)
            # Output toml file
            fh = os.path.join(fdironl, os.path.basename("mechanics.flux"))
            # Check to see if it is a symlink and if so remove the link
            if os.path.islink(fh):
                os.unlink(fh)
            # We can now safely write out the file
            with open(fh, "w") as f:
                f.write(flux_obj)

            jobspec = flux.job.JobspecV1.from_nest_command(
                [fh],
                num_slots=problem.nnodes,
                cores_per_slot=problem.ncpus,
                num_nodes=problem.nnodes,
            )

            jobspec.cwd = fdironl
            jobspec.stdout = fh_output
            jobspec.stderr = fh_error
            jobids.append(flux.job.submit(fh, jobspec, waitable=True))

    # Wait on all of our flux jobs to finish
    print(flux.job.job_list(fh))
    for j in jobids:
        jobid, istatus, errnum = flux.job.wait(fh, j)
        if not istatus:
            status.append(errnum)
        else:
            status.append(0)
    print(flux.job.job_list(fh))

    # Convert status back into a 2d array
    status = [
        status[i : i + problem.n_obj] for i in range(0, len(status), problem.n_obj)
    ]
    # Post-process all of the data last
    for igene, gene in enumerate(genes):
        f = problem.postprocess(igeneration, igene, status[igene])
        f_objective.append(np.copy(f))

    return f_objective

# Will want a custom way to handle one off launches for failed tests
def map_custom_fail(problem, igeneration, gene, igene):
    status = []
    f_objective = []
    fh = flux.Flux()

    # Preprocess all of the genes first
    problem.preprocess(gene, igeneration, igene)

    # Submit all of the flux jobs
    jobids = []

    istatus = []
    for iobj in range(problem.n_obj):
        gene_dir = "gen_" + str(igeneration)
        fdir = os.path.join(problem.workflow_dir, gene_dir, "")
        rve_name = "gene_" + str(igene) + "_obj_" + str(iobj)
        fdironl = os.path.join(fdir, rve_name, "")

        flux_obj = problem.job_script
        fh_output = os.path.join(fdironl, "optimization_out.txt")
        fh_error  = os.path.join(fdironl, "optimization_err.txt")
        modifiers = {"binary":problem.bin_mechanics, "nnodes":problem.nnodes, "ntasks":problem.ncpus, "ngpus":problem.ngpus,
        "output_name":fh_output, "error_name":fh_error}
        for iheader, repl_val in modifiers.items():
            search = "%%" + iheader + "%%"
            flux_obj = re.sub(search, str(repl_val), flux_obj)
        # Output toml file
        fh = os.path.join(fdironl, os.path.basename("mechanics.flux"))
        # Check to see if it is a symlink and if so remove the link
        if os.path.islink(fh):
            os.unlink(fh)
        # We can now safely write out the file
        with open(fh, "w") as f:
            f.write(flux_obj)

        jobspec = flux.job.JobSpecV1.from_nest_command(
            [fh],
            num_slots=problem.nnodes,
            cores_per_slot=problem.ncpus,
            num_nodes=problem.nnodes,
        )

        jobspec.cwd = fdironl
        jobspec.stdout = fh_output
        jobspec.stderr = fh_error
        jobids.append(flux.job.submit(fh, jobspec, waitable=True))

    # Wait on all of our flux jobs to finish
    print(flux.job.job_list(fh))
    for j in jobids:
        jobid, istatus, errnum = flux.job.wait(fh, j)
        if not istatus:
            status.append(errnum)
        else:
            status.append(0)
    print(flux.job.job_list(fh))

    # Convert status back into a 2d array
    status = [
        status[i : i + problem.n_obj] for i in range(0, len(status), problem.n_obj)
    ]

    # Post-process all of the data last
    f = problem.postprocess(igeneration, igene, status[0])
    f_objective.append(np.copy(f))

    return f_objective

where mechanics.flux is auto-generated but for this particular problem looks like:

import os

import flux
import flux.job
import flux.resource

def main():
    print(os.environ["FLUX_URI"])
    fh = flux.Flux()
    print(flux.resource.resource_list(fh))
    jobspec = flux.job.JobspecV1.from_command(
        ["exaconstit-mechanics-v0_6_1", "-opt", "optio\
ns.toml"],
        num_nodes=1,
        num_tasks=4,
        gpus_per_task=0
    )
    jobspec.setattr_shell_option("mpi", "spectrum")
    jobspec.setattr_shell_option("gpu-affinity", "per-task")
    jobspec.stdout = "ExaConstit_CP_Framework/Exaopt_DEAP/wf_\
files/gen_0/gene_0_obj_0/optimization_out.txt"  # TODO: fill in
    jobspec.stderr = "ExaConstit_CP_Framework/Exaopt_DEAP/wf_\
files/gen_0/gene_0_obj_0/optimization_err.txt"
    flux.job.wait(fh, flux.job.submit(jobspec, waitable=True))
    print(flux.resource.resource_list(fh))

if __name__ == '__main__':
    main()

After some trial and error and I've at least figured out the right flux names for things. However, I'm getting a pretty nasty crash from the flux side of things that I'm not too sure how to parse:

Traceback (most recent call last):
  File "/sw/summit/spack-envs/base/opt/linux-rhel8-ppc64le/gcc-8.3.1/flux-core-0.35.0-lu5vqlynowz4ulykyst5lmv7c7sjzhcp/lib/flux/python3.9/flux/wrapper.py", line 194, in __call__
    result = self.fun(*args)
TypeError: initializer for ctype 'struct flux_handle_struct *' must be a cdata pointer, not bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "ExaConstit_NSGA3.py", line 674, in <module>
    pop_library = main(seed, checkpoint, checkpoint_freq)
  File "ExaConstit_NSGA3.py", line 375, in main
    fitness_eval = toolbox.map_custom(problem, 0, invalid_ind)
  File "ExaConstit_CP_Framework/Exaopt_DEAP/flux_map.py", line 71, in map_custom
    jobids.append(flux.job.submit(fh, jobspec, waitable=True))
  File "/sw/summit/spack-envs/base/opt/linux-rhel8-ppc64le/gcc-8.3.1/flux-core-0.35.0-lu5vqlynowz4ulykyst5lmv7c7sjzhcp/lib/flux/python3.9/flux/job/submit.py", line 132, in submit
    future = submit_async(flux_handle, jobspec, urgency, waitable, debug, pre_signed)
  File "/sw/summit/spack-envs/base/opt/linux-rhel8-ppc64le/gcc-8.3.1/flux-core-0.35.0-lu5vqlynowz4ulykyst5lmv7c7sjzhcp/lib/flux/python3.9/flux/job/submit.py", line 74, in submit_async
    future_handle = RAW.submit(flux_handle, jobspec, urgency, flags)
  File "/sw/summit/spack-envs/base/opt/linux-rhel8-ppc64le/gcc-8.3.1/flux-core-0.35.0-lu5vqlynowz4ulykyst5lmv7c7sjzhcp/lib/flux/python3.9/flux/wrapper.py", line 196, in __call__
    raise InvalidArguments(
flux.wrapper.InvalidArguments: 
Invalid arguments passed to wrapped C function:
Name: submit
C signature: struct flux_future *(*)(struct flux_handle_struct *, char *, int, int)
Arguments: ('ExaConstit_CP_Framework/Exaopt_DEAP/wf_files/gen_0/gene_0_obj_0/mechanics.flux', '{"resources": [{"type": "node", "count": 1, "with": [{"type": "slot", "count": 1, "with": [{"type": "core", "count": 4}], "label": "task"}]}], "tasks": [{"command": ["flux", "broker", "ExaConstit_CP_Framework/Exaopt_DEAP/wf_files/gen_0/gene_0_obj_0/mechanics.flux"], "slot": "task", "count": {"per_slot": 1}}], "attributes": {"system": {"duration": 0, "shell": {"options": {"per-resource": {"type": "node"}, "output": {"stdout": {"type": "file", "path": "ExaConstit_CP_Framework/Exaopt_DEAP/wf_files/gen_0/gene_0_obj_0/optimization_out.txt"}, "stderr": {"type": "file", "path": "ExaConstit_CP_Framework/Exaopt_DEAP/wf_files/gen_0/gene_0_obj_0/optimization_err.txt"}}}}, "cwd": "ExaConstit_CP_Framework/Exaopt_DEAP/wf_files/gen_0/gene_0_obj_0/"}}, "version": 1}', 16, 4)

Let me know if it might be useful to have access to the full set of python code and shell script to test things as well and I can provide them as well.

jameshcorbett commented 2 years ago

I think the problem is that you have a variable fh which is initialized with fh = flux.Flux() at the top of the map_custom function but then overwritten to be a string later in the function with fh = os.path.join(fdironl, os.path.basename("mechanics.flux")). Then when you call jobids.append(flux.job.submit(fh, jobspec, waitable=True)) a few lines later, you're passing in a string for fh where flux.job.submit() expects a flux.Flux instance. I think if you just keep the string and the flux handle separate (maybe call it flux_handle or something) you'll get past the error.

rcarson3 commented 2 years ago

@jameshcorbett that seemed to work, and I guess I completely missed that I'd already declared the flux handle as fh when introducing some new code. I'm now running into an issue where the job doesn't seem to run and flux is returning the following error after waiting on the job:

[249829523456, False, b'Fatal exception type=exec task 0: start failed: flux: No such file or directory']

I've tried setting the debug flag to true when submitting the job to see if that'll provide anymore useful info in regards to what file or directory is missing with little luck. I will say that it doesn't appear as if the "mechanics.flux" file is running given on Summit I did a simple modification so the outer and inner job submission would have different output and error files, and only the outer flux job one is getting created.

grondo commented 2 years ago

start failed: flux: No such file or directory indicates that the job is trying to run the command flux and it isn't found in PATH. I think the problem is that flux.job.JobspecV1.from_nest_command() has created a jobspec with an empty environment by default. Try adding:

    jobspec.environment = os.environ

To copy the current environment into the jobspec.

rcarson3 commented 2 years ago

@grondo that seemed to help. The jobs are starting to run but then the fail immediately or hang as it appears the Spectrum MPI isn't being picked up in the nested flux instance that's the mechanics.flux file, but I can see it listed in the environmental variables that the nest flux instance is using...

Also, I'm able to run the executable that flux is calling in the nested instance just fine if I make use of jsrun within the same script file that drives this python workflow. So, I'm guessing something just isn't getting loaded correctly from the python flux side of things, or I'm missing something else in the nested flux instance file.

MPI error

[h24n12:2320110] mca_base_component_repository_open: unable to open mca_schizo_lsf: libbat.so: can\
not open shared object file: No such file or directory (ignored)
[h24n12:2320105] mca_base_component_repository_open: unable to open mca_schizo_lsf: libbat.so: can\
not open shared object file: No such file or directory (ignored)
[h24n12:2320095] mca_base_component_repository_open: unable to open mca_schizo_lsf: libbat.so: can\
not open shared object file: No such file or directory (ignored)
[h24n12:2320099] mca_base_component_repository_open: unable to open mca_schizo_lsf: libbat.so: can\
not open shared object file: No such file or directory (ignored)
--------------------------------------------------------------------------
A requested component was not found, or was unable to be opened.  This
means that this component is either not installed or is unable to be
used on your system (e.g., sometimes this means that shared libraries
that the component requires are unable to be found/loaded).  Note that
Open MPI stopped checking at the first component that it did not find.

Host:      h24n12
Framework: pml
Component: yalla
--------------------------------------------------------------------------
--------------------------------------------------------------------------
It looks like MPI_INIT failed for some reason; your parallel process is
likely to abort.  There are many reasons that a parallel process can
fail during MPI_INIT; some of which are due to configuration or environment
problems.  This failure appears to be an internal failure; here's some
additional information (which may only be relevant to an Open MPI
developer):

  mca_pml_base_open() failed
  --> Returned "Not found" (-13) instead of "Success" (0)
--------------------------------------------------------------------------
[h24n12:2320095] *** An error occurred in MPI_Init
[h24n12:2320095] *** reported by process [0,0]
[h24n12:2320095] *** on a NULL communicator
[h24n12:2320095] *** Unknown error
[h24n12:2320095] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[h24n12:2320095] ***    and potentially your MPI job)
grondo commented 2 years ago

Hm, it does look like you are setting the mpi shell option to spectrum in the mechanics.flux script, so that much should be working. I'm afraid I don't know enough about Spectrum MPI to be of much more assistance here. @jameshcorbett may have some further suggestions.

I assume MPI didn't fail before converting from a shell script to Python? One thing you could try would be to compare the environment in each case, e.g. flux min run printenv vs setting printenv as the from_command() command.

jameshcorbett commented 2 years ago

I don't have any further suggestions. I would try whether something like [jsrun ...] flux start flux mini run -ompi=spectrum -N1 -n4 -g0 exaconstit-mechanics-v0_6_1 -opt options.toml works though. If so then I would try [jsrun ...] flux start flux mini alloc -N1 -n1 -c40 flux mini run -ompi=spectrum -N1 -n4 -g0 exaconstit-mechanics-v0_6_1 -opt options.toml (I'm just trying to progressively get closer to what your script is doing).

rcarson3 commented 2 years ago

@jameshcorbett okay I think I figured out the issue. It looks at some point I had added flux-core as a module to load in my job script as it appeared this was necessary to get my python script to run. It turns out that was not actually and was causing issues further down the road. Outside of that I ran into a few other issues which were due to the cwd for the nested flux job not being set.

At this point I think the only thing I need to figure out is how to pipe the output of my workflow python file in the below to an output file as they don't appear to be going to the out file or stdout. While testing things I'd been using the interactive version of flux, so I could see all the output but that doesn't appear to be the case when running the below. I will mention that I've at least gotten partially around this issue by just making use of my workflow loggers in place of the python print statements in certain areas.

jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${NUM_NODES} --bind=none --smpiargs="-disable_gpu_hooks" flux start -o,-Slog-filename=out python3 workflow.py

Nonetheless, I now have something that appears to be working. The one thing that might be nice though is the ability to get out the runtime of the jobs once they've finished. I know there must be a way to do this as my CLI flux workflow outputs that info, but I'm just not sure what option does that...

Once, I clean up the workflow a bit I'll be adding it to the ExaConstit repo, and I'll post the link to it here just as a future reference of a python Flux example.

grondo commented 2 years ago

The one thing that might be nice though is the ability to get out the runtime of the jobs once they've finished

The runtime of a job can be obtained from the JobInfo object for the job, which can be obtained from flux.job.JobList or flux.job.job_list_id().

Here's an example of a small program using the JobList class as an example:

import sys
import flux
from flux.job import JobList, JobID

jobs = JobList(flux.Flux(), ids=[JobID(x) for x in sys.argv]).jobs()
for job in jobs:
    print (f"{job.id} runtime: {job.runtime:12.3f}s")
rcarson3 commented 2 years ago

Thanks @grondo that appears to work quite well for my needs.

jameshcorbett commented 2 years ago

At this point I think the only thing I need to figure out is how to pipe the output of my workflow python file in the below to an output file as they don't appear to be going to the out file or stdout. While testing things I'd been using the interactive version of flux, so I could see all the output but that doesn't appear to be the case when running the below. I will mention that I've at least gotten partially around this issue by just making use of my workflow loggers in place of the python print statements in certain areas.

Hmm, this seemed to work for me:

[corbett8@lassen34:~]$ jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${NUM_NODES} --bind=none --smpiargs="-disable_gpu_hooks" flux start -o,-Slog-filename=out python3 -c "print('foobar')"
foobar

(Although yeah the file out just has a bunch of flux log files, not the output you would be looking for.)

There are a lot of potential culprits here. Have you tried redirecting stdout/stderr to a file and seeing if that shows up? E.g.

jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${NUM_NODES} --bind=none --smpiargs="-disable_gpu_hooks" flux start -o,-Slog-filename=out python3 workflow.py > workflow.out 2>&1

Or maybe try these jsrun options:

  -k, --stdio_stderr=<filename>   Stdio stderr.
  -o, --stdio_stdout=<filename>   Stdio output.
rcarson3 commented 2 years ago

@jameshcorbett so as I'm working with an optimization problem the python workflow script can take a bit to finish running even for the small tests. It turns out that eventually things are printed out to stdout either after the job has finished or after quite a few minutes. So, it's definitely not "instantaneous" output. Also, I had tried redirecting the output to a file as you show up above and that one did not output anything if I ctrl-c the job. It's one of the reasons I thought earlier that things just weren't working.

I just tested the jsrun options you listed above and run into the same issues as if I'd had the output redirected to a file using >.

Luckily, the logging method seems to work well enough that I can still monitor the progress of the optimization script as things are progressing.

grondo commented 2 years ago

I can't be certain if this has anything to do with output only appearing "after a few minutes", but one thing to check is to ensure that your tests are not using "fully buffered" IO. You can ensure output is flushed by either adding flush=True to print statements, or perhaps try one of the other suggestions in this SO post.

rcarson3 commented 2 years ago

@jameshcorbett so I just noticed that I'm seeing a bunch of core.* files generated with a number of the flux runs which doesn't appear if I just do a jsrun instead. I didn't notice this earlier as I was just focused on making sure things work. Currently, I'm not too sure what's causing these core files to appear as the jobs appear to be running to completion based on the output files. I will say that it isn't just python driver of flux causing the issue as I've reproduced it with a CLI version of things as well. It also appears to be nondeterministic as well so certain jobs that produced core file during one run of flux won't produce them another time.

I'll have to look into the core issue as it's possible that something in the newer version of my code is crashing when being run on Summit as I don't recall this being an issue with earlier versions of my code and using flux...

edit: Just ran Valgrind on Summit for it and am not seeing any obvious memory leaks that could be causing issues in my newer binary. I'll try running valgrind on the flux call of things to see if that pops anything obvious.

edit2: Since the job also is running to completion, I might just rm the core files from within python at least until I can't figure out what's causing issues.

grondo commented 2 years ago

Are the corefiles from your app or are they flux core files?

rcarson3 commented 2 years ago

@grondo so after poking at the core files using ARM Forge, it appears that the failure is in the darshan-core file which is called during MPI_Finalize. Particularly the output looks like:

#21 main () (at 0x000000001001ec3c)
#20 MPI_Finalize () at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-core-init-finalize.c:85 (at 0x00007fffb4d37ee4)
#19 darshan_core_shutdown () at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-core.c:529 (at 0x00007fffb4d3ba24)
#18 darshan_log_open (core=0x2a6d6650, log_fh=<optimized out>, logfile_name=0x2a7e6c30 "/gpfs/alpine/darshan/summit/2022/7/11/carson16_exaconstit-mechanics-v0_6_1_id1327616_7-11-58540-13453057104823224306.darshan_partial") at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-core.c:1540 (at 0x00007fffb4d3ba24)
#17 MPI_File_open (comm=0x2a6d6730, filename=0x2a7e6c30 "/gpfs/alpine/darshan/summit/2022/7/11/carson16_exaconstit-mechanics-v0_6_1_id1327616_7-11-58540-13453057104823224306.darshan_partial", amode=<optimized out>, info=0x2a79f8f0, fh=0x7fffe0d8d1d0) at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-mpiio.c:335 (at 0x00007fffb4d59620)
#16 PMPI_File_open () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/libmpi_ibm.so.3 (at 0x00007fffb4b73964)
#15 ompi_file_open () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/libmpi_ibm.so.3 (at 0x00007fffb4b3326c)
#14 mca_io_base_file_select () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/libmpi_ibm.so.3 (at 0x00007fffb4be4588)
#13 mca_io_ompio_file_open () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/spectrum_mpi/mca_io_ompio.so (at 0x00007fff88815fc8)
#12 mca_common_ompio_file_open () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/libmca_common_ompio.so.3 (at 0x00007fff887e838c)
#11 mca_sharedfp_sm_file_open () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/spectrum_mpi/mca_sharedfp_sm.so (at 0x00007fff88634b7c)
#10 opal_output () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/libopen-pal.so.3 (at 0x00007fff9efc0938)
#9 output.part () from /autofs/nccs-svm1_sw/summit/ums/gen007flux/linux-rhel8-ppc64le/spectrum-mpi/10.4.0.3-20210112/container/../lib/libopen-pal.so.3 (at 0x00007fff9efbf788)
#8 fileno (stream=0x7fff9f5016b0) at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-posix.c:607 (at 0x00007fffb4d43a88)
#7 darshan_stdio_lookup_record_name (stream=<optimized out>) at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-stdio.c:1305 (at 0x00007fffb4d55e0c)
#6 darshan_core_lookup_record_name (rec_id=<optimized out>) at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-core.c:2308 (at 0x00007fffb4d39bcc)
#5  <signal handler called>
#4 ucs_error_signal_handler () from /lib64/libucs.so.0 (at 0x00007fff9c523f44)
#3 ucs_log_flush () from /lib64/libucs.so.0 (at 0x00007fff9c524768)
#2 fileno (stream=0x7fff9f501790) at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-posix.c:607 (at 0x00007fffb4d43a88)
#1 darshan_stdio_lookup_record_name (stream=<optimized out>) at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-stdio.c:1305 (at 0x00007fffb4d55e0c)
#0 darshan_core_lookup_record_name (rec_id=<optimized out>) at /tmp/belhorn/spack-stage/spack-stage-darshan-runtime-3.2.1-om62upvgddsdrrjpfjfemns3zmnckg3u/spack-src/darshan-runtime/lib/darshan-core.c:2308 (at 0x00007fffb4d39bcc)

It does look like some of the calls for the MPI trace are coming from the flux modules for MPI, so I'm not too sure if this is an issue with one of y'all's installs or something from an OLCF package.

grondo commented 2 years ago

Thanks.

Sorry, I don't have any idea on that stack trace, except it looks a bit like your code is getting a signal (SIGSEGV maybe?) during darshan_core_lookup_record_name()? I could be misreading the trace though. I'm not sure what else to look for unfortunately.

rcarson3 commented 2 years ago

The error code returned from the program is 139 which definitely suggests that it's a SIGSEGV. It's still not clear to me what's causing it other than maybe some issue internally to the MPI library that Flux is now using. Interestingly enough, I never ran into this issue when running my old flux workflows that I ran back in November on Summit. I've even tested my old binary from those runs which didn't have any issues then on this new workflow, and I'm seeing the same core dump issue.

At this point though, it's not hurting my runs other than eating up a few more seconds for each run, and I can simply remove the core files from the python script, so I'm not taking as much a disk space hit.

rcarson3 commented 2 years ago

@grondo and @jameshcorbett so I was finally able to get the workflow that was driving this out onto ExaConstit's repo after working through the details to preserve the git history of of the original repo it was developed in. You can see the relevant flux code here: https://github.com/LLNL/ExaConstit/blob/exaconstit-dev/workflows/optimization/flux_map.py and here: https://github.com/LLNL/ExaConstit/blob/exaconstit-dev/workflows/optimization/master_flux.flux .

rcarson3 commented 2 years ago

@jameshcorbett so I just tested with a GPU enabled job as I'm preparing for the large scale job. For these jobs, I'm just running 1 CPU and 1 GPU per job submission on a node of Summit. I'm seeing about 5-6 jobs launch successfully, and then the rest I see something like pop-up from the inner flux instance

    jobid, istatus, errnum = flux.job.wait(fh, flux.job.submit(fh, jobspec, waitable=True))
    print(["inner flux job ", jobid, istatus, errnum])
# above prints something like:
# ['inner flux job ', 19713228800, False, b'Fatal exception type=alloc alloc denied due to type="unsatisfia\
ble"']

returned from the rest of the jobs. My flux python scripts look the links I listed above. Do you see anything in potentially the https://github.com/LLNL/ExaConstit/blob/exaconstit-dev/workflows/optimization/flux_map.py that I might need to modify to have the other jobs wait for the resources to become available?

grondo commented 2 years ago

I can look at this further, but an unsatisfiable request job exception indicates that the submitted jobspec cannot ever be satisfied by the current instance, even if all resources were currently free.

To debug, I would print the jobspec that got the error, and the resource set of the instance to which you are submitting. Could it be that you are inadvertently creating an instance without a GPU and then submitting a request that includes a GPU?

It may also be helpful to run something like flux pstree --details=resources at the top-level instance to get an idea of the shape of your instance hierarchy.

rcarson3 commented 2 years ago

@grondo unfortunately flux pstree isn't available on y'all's Summit install of flux as that version is only 0.29.0 and it looks like that feature wasn't added until 0.34.0. I've also tried using the print(flux.resource.list.resource_list(flux_handle).get()) based on https://flux-framework.readthedocs.io/projects/flux-core/en/latest/python/autogenerated/flux.resource.list.html to see if I could print out the resource sets that way but Python complains about flux.resource.list not being available. If a more recent version could be installed then I could track down the issue, or if a newer version is available on Lassen I could try my scripts out on there and see if I can't reproduce the above set of errors.

I'm also wondering if part of the issue isn't due to submitting N number of jobs on a single node where the resource set isn't set to exclusive for each from_nest_command submission as seen down below:

            jobspec = flux.job.JobspecV1.from_nest_command(
                ["python3", fh],
                num_slots=problem.nnodes,
                cores_per_slot=problem.ncpus,
                gpus_per_slot=problem.ngpus,
                num_nodes=problem.nnodes,
            )
            jobspec.cwd = fdironl
            jobspec.stdout = "flux_output.txt"
            jobspec.stderr = "flux_error.txt"
            jobspec.environment = dict(os.environ)
            jobspec.setattr_shell_option("mpi", "spectrum")
            jobspec.setattr_shell_option("gpu-affinity", "per-task")
            jobspec.setattr_shell_option("cpu-affinity", "per-task")
            jobspec.duration = int(problem.timeout[iobj])
            jobids.append(flux.job.submit(flux_handle, jobspec, waitable=True))

I saw that there is an option in there for exclusive in there but it sounded like that was to set things as exclusive access per node rather than just for the resources. I am wondering if I want to subdivide a node into more than 1 job that I might need to do a nested flux instance that has exclusive access to a node and then creates 6 additional nested flux instances that only requires 1 CPU and 1 GPU at time.

grondo commented 2 years ago

unfortunately flux pstree isn't available on y'all's Summit install of flux as that version is only 0.29.0 and it looks like that feature wasn't added until 0.34.0.

Ah, yes, I apologize for the age the Flux install on Summit. Lassen does have a newer version, though we've noticed that the system installation of Flux there (/usr/bin/flux) may have an issue bootstrapping under jsrun, so one of the /usr/global/tools versions should be used instead (See this FAQ entry)

I'm not sure how to ensure a newer version of Flux makes its way to Summit. I'll work with James to see when he returns.

I'm also wondering if part of the issue isn't due to submitting N number of jobs on a single node where the resource set isn't set to exclusive for each from_nest_command submission as seen down below:

The exclusive flag is supposed to tell the scheduler that if you allocate any part of a composite resource (e.g. a node), then all child resources should also be included in the resulting allocation. This is currently only really useful when the Fluxion scheduler (flux-sched) is configured with one of the exclusive (hinodex, lonodex) policies, so I don't think it will be useful for you here.

I'm afraid I'm only vaguely familiar with your workflow here, and how you are using Flux to solve it, so at the moment I'm having trouble coming up with other things to try.

When re-reading all the comments above it looks like your script submits a nested Flux instance for each "problem", then runs a single job in each instance (described by a mechanics.flux script)? Is that the gist of it? I apologize if I got this wrong. (I feel like I indeed have misunderstood since I don't see where more than one job is submitted to an "inner" Flux instance)

In case it is useful, a more traditional way to use Flux in a scenario where you want to submit many jobs would be to run a single nested Flux instance and submit many jobs to that instance. This would be more efficient since you wouldn't have to bootstrap another Flux instance for each job. (This comment only applies if I've understood how you are using Flux now). Also the one instance would have access to all resources, so you are less likely to be plagued by the unsatisfiability errors.

rcarson3 commented 2 years ago

Ah, yes, I apologize for the age the Flux install on Summit. Lassen does have a newer version, though we've noticed that the system installation of Flux there (/usr/bin/flux) may have an issue bootstrapping under jsrun, so one of the /usr/global/tools versions should be used instead (See this FAQ entry)

I'm not sure how to ensure a newer version of Flux makes its way to Summit. I'll work with James to see when he returns.

Great I'll try out Lassen when I get a chance. Also, my team would definitely appreciate the work on Summit as we'll be running some challenge problem runs on there for the ExaAM project in preparation for our ultimate runs on Frontier.

When re-reading all the comments above it looks like your script submits a nested Flux instance for each "problem", then runs a single job in each instance (described by a mechanics.flux script)? Is that the gist of it? I apologize if I got this wrong. (I feel like I indeed have misunderstood since I don't see where more than one job is submitted to an "inner" Flux instance)

In case it is useful, a more traditional way to use Flux in a scenario where you want to submit many jobs would be to run a single nested Flux instance and submit many jobs to that instance. This would be more efficient since you wouldn't have to bootstrap another Flux instance for each job. (This comment only applies if I've understood how you are using Flux now). Also the one instance would have access to all resources, so you are less likely to be plagued by the unsatisfiability errors.

Yes, you've got the gist of what this general workflow. I'd just been copying the one @dongahn and the rest of the Flux team had helped set me up with way back in 2020 where each job made use of multiple nodes, and I didn't need to split a node up into multiple jobs. I'll give your suggestion a go and see how that goes.

rcarson3 commented 2 years ago

@grondo it appears that the nested flux instances was the issue here. I swapped things over to

            jobspec = flux.job.JobspecV1.from_command(
                [problem.bin_mechanics, "-opt", "options.toml"],
                num_nodes=problem.nnodes,
                num_tasks=problem.ncpus,
                cores_per_task=1,
                gpus_per_task=ngpus
            )

in the flux_map.py file so it no longer makes a nested flux instance and that seems to have worked.

grondo commented 2 years ago

Great! We may want to revisit why it was originally proposed to use a nested Flux instance for your use case. I didn't see an obvious reason, and I think most of the people that were probably in your original 2020 discussions are no longer available.

If you run into other issues with the current workload, we can perhaps discuss more.

vsoch commented 10 months ago

@rcarson3 were there other points of discussion or are we good to close this issue?

rcarson3 commented 10 months ago

@vsoch yes we can close this.