Snakemake-Profiles / slurm

Cookiecutter for snakemake slurm profile
MIT License
126 stars 44 forks source link

Grouped jobs: KeyError: 'params' #55

Closed CharlesJB closed 4 years ago

CharlesJB commented 4 years ago

Hello,

I am trying to get the slurm profiles to work with grouped jobs. When I am using my profile for a normal job, everything works fine, but I systematically get the following error when I try to launch grouped jobs:

Traceback (most recent call last):
  File "/home/username/.config/snakemake/slurm/slurm-submit.py", line 52, in <module>
    sbatch_options = slurm_utils.format_values(sbatch_options, job_properties)
  File "/home/username/.config/snakemake/slurm/slurm_utils.py", line 95, in format_values
    formatted[key] = format_wildcards(value, job_properties)
  File "/home/username/.config/snakemake/slurm/slurm_utils.py", line 68, in format_wildcards
    job._format_params = Wildcards(fromdict=job_properties['params'])
KeyError: 'params'

I'm doing tests with a minimal Snakefile:

sample_names = [1,2]

rule all:
    input:
        "test.out"

rule a:
    output:
        "a/{sample_name}.out"
    group: "mygroup"
    shell:
        "touch {output}"

rule b:
    input:
        "a/{sample_name}.out"
    output:
        "b/{sample_name}.out"
    group: "mygroup"
    shell:
        "touch {output}"

rule c:
    input:
        expand("b/{sample_name}.out", sample_name=sample_names)
    output:
        "test.out"
    shell:
        "touch {output}"

This is my slurm-submit.py:

#!/usr/bin/env python
"""
Snakemake SLURM submit script.
"""
import warnings  # use warnings.warn() rather than print() to output info in this script

from snakemake.utils import read_job_properties

import slurm_utils

# cookiecutter arguments
SBATCH_DEFAULTS = """"""
CLUSTER_CONFIG = "/home/username/.config/snakemake/slurm/cluster_config.yml"
ADVANCED_ARGUMENT_CONVERSION = {"yes": True, "no": False}["no"]

RESOURCE_MAPPING = {
    "time": ("time", "runtime", "walltime"),
    "mem": ("mem", "mem_mb", "ram", "memory"),
    "mem-per-cpu": ("mem-per-cpu", "mem_per_cpu", "mem_per_thread"),
    "nodes": ("nodes", "nnodes")
}

# parse job
jobscript = slurm_utils.parse_jobscript()
job_properties = read_job_properties(jobscript)

sbatch_options = {}
cluster_config = slurm_utils.load_cluster_config(CLUSTER_CONFIG)

# 1) sbatch default arguments
sbatch_options.update(slurm_utils.parse_sbatch_defaults(SBATCH_DEFAULTS))

# 2) cluster_config defaults
sbatch_options.update(cluster_config["__default__"])

# 3) Convert resources (no unit conversion!) and threads
sbatch_options.update(
    slurm_utils.convert_job_properties(job_properties, RESOURCE_MAPPING)
)

# 4) cluster_config for particular rule
sbatch_options.update(cluster_config.get(job_properties.get("rule"), {}))

# 5) cluster_config options
sbatch_options.update(job_properties.get("cluster", {}))

# 6) Advanced conversion of parameters
if ADVANCED_ARGUMENT_CONVERSION:
    sbatch_options = slurm_utils.advanced_argument_conversion(sbatch_options)

#7) Format pattern in snakemake style
sbatch_options = slurm_utils.format_values(sbatch_options, job_properties)

# ensure sbatch output dirs exist
for o in ("output", "error"):
    slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None

# submit job and echo id back to Snakemake (must be the only stdout)
print(slurm_utils.submit_job(jobscript, **sbatch_options))

And my cluster_config.yml:

__default__:
    account: my-hpc-account # your hpc account
    time: 60 # default time (minutes)
    nodes: 1
    ntasks: 1
    mem: 14GB # default memory

I'm using a snakemake that I installed following this guide and the version is:

(snakemake_global) ➜  groups snakemake --version
5.23.0

I am under the impression that the problem arise when this following line is executed:

job_properties = read_job_properties(jobscript)

Somehow, when there is the group directive, the params directive is incorrectly set. I did try to add a params directive in my Snakefile, but I still get the same error, wether the params directive is an empty string or a normal string.

I'm not that good with python and I am not sure how to debug this. Do you have any idea of how I could solve this problem?

Thanks! Charles.

fgvieira commented 4 years ago

This issue seems to be very similar to #48.

percyfal commented 4 years ago

Hi, I just merged #50, see if this solves your issues.

CharlesJB commented 4 years ago

Hello,

I tested the changes from #50 and I now I am getting a similar error but with 'wildcards' instead of 'params':

Traceback (most recent call last):
  File "/home/jolybeau/.config/snakemake/slurm/slurm-submit.py", line 52, in <module>
    sbatch_options = slurm_utils.format_values(sbatch_options, job_properties)
  File "/home/jolybeau/.config/snakemake/slurm/slurm_utils.py", line 98, in format_values
    formatted[key] = format_wildcards(value, job_properties)
  File "/home/jolybeau/.config/snakemake/slurm/slurm_utils.py", line 72, in format_wildcards
    job._format_wildcards = Wildcards(fromdict=job_properties['wildcards'])
KeyError: 'wildcards'
percyfal commented 4 years ago

Yes, I just added a test for group jobs and noticed the same issue; in addition there was another property ('rule') that was undefined. There is an open PR (#58) that fixes this issue which I hope to merge shortly.

GrandH2O commented 2 years ago

I got the similar error in SGE system.

GrandH2O commented 2 years ago

image My snakemake version is 6.10.0,
I write a minimal test as following:

configfile: "config_test/config.yaml"
rule all:
    input:
        "test1.done",
        "test2.done",
        "TEST1.DONE",
        "TEST2.DONE"
def get_mem_mb(wildcards, attempt, basic_mem):
    if attempt != 1:
        attempt = attempt * attempt
    return attempt * basic_mem

def get_threads(wildcards, attempt, threads):

    return attempt * threads

rule test:
    input:
        "{sample}.txt"
    output:
        "{sample}.done"
    threads:2
    benchmark:
        os.path.join(config['benchmarkdir'], "{sample}.benchmark.tsv")
    group: "test"
    params:
        qsub_prefix=config['qsub_prefix'],
        logdir=config['logdir']
    shell:
        "sleep 5s && echo {input} > {output}"

rule test2:
    input:
        "{TEST}.TXT"
    output:
        "{TEST}.DONE"
    threads: 2
    params:
        qsub_prefix=config['qsub_prefix'],
        logdir=config['logdir']
    group: "test"
    shell:
        "sleep 5s && echo {input} > {output}"
GrandH2O commented 2 years ago

my profile is as following:

import os
import re
import math
import argparse
import subprocess

# use warnings.warn() rather than print() to output info in this script
# because snakemake expects the jobid to be the only output
import warnings
import snakemake
from snakemake import io
from snakemake.utils import read_job_properties

DEFAULT_JOB_NAME = "snakemake_job"
QSUB_DEFAULTS = "-V"
CLUSTER_CONFIG = "sge.yaml"

# SGE syntax for options is `-option [value]` and for resources is `-l name=value`
# we therefore distinguish the two in this script to make it easier to handle.
# We also define some aliases for options and resources so that the rules can
# be more expressive than a list of cryptic SGE resources.

# We additionally pickup a list of environment modules which will be loaded in the
# jobscript

OPTION_MAPPING = {
    "binding": ("binding",),
    "cwd"    : ("cwd",),
    "e"      : ("e", "error"),
    "hard"   : ("hard",),
    "j"      : ("j", "join"),
    "m"      : ("m", "mail_options"),
    "M"      : ("M", "email"),
    "notify" : ("notify",),
    "now"    : ("now",),
    "N"      : ("N", "name"),
    "o"      : ("o", "output"),
    "P"      : ("P", "project"),
    "p"      : ("p", "priority"),
    "pe"     : ("pe", "threads", "parallel_environment"),
    "pty"    : ("pty",),
    "q"      : ("q", "queue"),
    "R"      : ("R", "reservation"),
    "r"      : ("r", "rerun"),
    "soft"   : ("soft",),
    "v"      : ("v", "variable"),
    "V"      : ("V", "export_env"),
    "wd"     : ("wd",)
}

RESOURCE_MAPPING = {
    # default queue resources
    "qname"            : ("qname",),
    "hostname"         : ("hostname",),
    # "notify" -- conflicts with OPTION_MAPPING
    "calendar"         : ("calendar",),
    "min_cpu_interval" : ("min_cpu_interval",),
    #"tmpdir"           : ("tmpdir",), not in genecast qsub
    "seq_no"           : ("seq_no",),
    "s_rt"             : ("s_rt", "soft_runtime", "soft_walltime"),
    "h_rt"             : ("h_rt", "time", "runtime", "walltime"),
    "s_cpu"            : ("s_cpu", "soft_cpu"),
    "h_cpu"            : ("h_cpu", "cpu"),
    "s_data"           : ("s_data", "soft_data"),
    "h_data"           : ("h_data", "data"),
    "s_stack"          : ("s_stack", "soft_stack"),
    "h_stack"          : ("h_stack", "stack"),           
    "s_core"           : ("s_core", "soft_core"),
    "h_core"           : ("h_core", "core"),
    "s_rss"            : ("s_rss", "soft_resident_set_size"),
    "h_rss"            : ("h_rss", "resident_set_size"),
    # default host resources
    "slots"            : ("slots",),
    "s_vmem"           : ("s_vmem", "soft_memory", "soft_virtual_memory"),
    #"h_vmem"           : ("h_vmem", "mem", "memory", "virtual_memory"),
    "s_fsize"          : ("s_fsize", "soft_file_size"),
    "h_fsize"          : ("h_fsize", "file_size"),
    "mf"               : ("mem_mb",)
}

def add_custom_resources(resources, resource_mapping=RESOURCE_MAPPING):
    """Adds new resources to resource_mapping.

       resources -> dict where key is sge resource name and value is a 
                    single name or a list of names to be used as aliased
    """
    for key, val in resources.items():
        if key not in resource_mapping:
            resource_mapping[key] = tuple()

        # make sure the resource name itself is an alias
        resource_mapping[key] += (key,)
        if isinstance(val, list):
            for alias in val:
                if val != key:
                    resource_mapping[key] += (alias,)
        else:
            if val != key:
                resource_mapping[key] += (val,)

def add_custom_options(options, option_mapping=OPTION_MAPPING):
    """Adds new options to option_mapping.

       options -> dict where key is sge option name and value is a single name
                  or a list of names to be used as aliased
    """
    for key, val in options.items():
        if key not in option_mapping:
            option_mapping[key] = tuple()

        # make sure the option name itself is an alias
        option_mapping[key] += (key,)
        if isinstance(val, list):
            for alias in val:
                if val != key:
                    option_mapping[key] += (alias,)
        else:
            if val != key:
                option_mapping[key] += (val,)

def parse_jobscript():
    """Minimal CLI to require/only accept single positional argument."""
    p = argparse.ArgumentParser(description="SGE snakemake submit script")
    p.add_argument("jobscript", help="Snakemake jobscript with job properties.")
    return p.parse_args().jobscript

def parse_qsub_defaults(parsed):
    """Unpack QSUB_DEFAULTS."""
    d = parsed.split() if type(parsed) == str else parsed

    options={}
    for arg in d:
        if "=" in arg:
            k,v = arg.split("=")
            options[k.strip("-")] = v.strip()
        else:
            options[arg.strip("-")] = ""
    return options

def format_job_properties(string):
    # we use 'rulename' rather than 'rule' for consistency with the --cluster-config 
    # snakemake option
    return string.format(rulename=job_properties['rule'], jobid=job_properties['jobid'], qsub_prefix=job_properties['params']['qsub_prefix'])

def parse_qsub_settings(source, resource_mapping=RESOURCE_MAPPING, option_mapping=OPTION_MAPPING):
    job_options = { "options" : {}, "resources" : {}}

    for skey, sval in source.items():
        found = False
        if skey == 'tmpdir':
            continue
        for rkey, rval in resource_mapping.items():
            if skey in rval:
                found = True
                # Snakemake resources can only be defined as integers, but SGE interprets
                # plain integers for memory as bytes. This hack means we interpret memory
                # requests as gigabytes
                if (rkey == 's_vmem') or (rkey == 'h_vmem'):
                    job_options["resources"].update({rkey : str(sval) + 'G'})
                elif (rkey == 'mf'):
                    job_options['resources'].update({rkey : str(sval) + 'M'})
                else:
                    job_options["resources"].update({rkey : sval})
                break
        if found: continue
        for okey, oval in option_mapping.items():

            if skey == 'threads' and skey in oval:
                found = True
                pe_str = " smp " + str(sval)
                job_options['options'].update({okey: pe_str})
                break
            elif skey in oval:
                found = True
                job_options["options"].update({okey : sval})
                break
        if not found:
            raise KeyError(f"Unknown SGE option or resource: {skey}")

    return job_options

def load_cluster_config(path):
    """Load config to dict either from absolute path or relative to profile dir."""
    if path:
        path = os.path.join(os.path.dirname(__file__), os.path.expandvars(path))
        default_cluster_config = io.load_configfile(path)
    else:
        default_cluster_config = {}
    if "__default__" not in default_cluster_config:
        default_cluster_config["__default__"] = {}
    return default_cluster_config

def ensure_directory_exists(path):
    """Check if directory exists and create if not"""
    directory = os.path.dirname(path)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    return

def update_double_dict(outer, inner):
    """Similar to dict.update() but does the update on nested dictionaries"""
    for k, v in outer.items():
        outer[k].update(inner[k])

def sge_option_string(key, val):
    if val == "":
        return f"-{key}"
    if type(val) == bool:
        return f"-{key} " + ("yes" if val else "no")
    return format_job_properties(f"-{key} {val}")

def sge_resource_string(key, val):
    if val == "":
        return f"-l {key}"
    if type(val) == bool:
        return f"-{key}=" + ("true" if val else "false")
    return f"-l {key}={val}"

def submit_job(jobscript, qsub_settings):
    """Submit jobscript and return jobid."""
    flatten = lambda l: [item for sublist in l for item in sublist]
    batch_options = flatten([sge_option_string(k,v).split() for k, v in qsub_settings["options"].items()])
    batch_resources = flatten([sge_resource_string(k, v).split() for k, v in qsub_settings["resources"].items()])
    try:
        # -terse means only the jobid is returned rather than the normal 'Your job...' string
        jobid = subprocess.check_output(["qsub", "-terse"] + batch_options + batch_resources + [jobscript]).decode().rstrip()
        #print(["qsub", "-terse"] + batch_options + batch_resources + [jobscript])
    except subprocess.CalledProcessError as e:
        raise e
    except Exception as e:
        raise e
    return jobid

jobscript = parse_jobscript()

# get the job properties dictionary from snakemake 
job_properties = read_job_properties(jobscript)
os.system(f'cp {jobscript} .')
#print(job_properties)
qsub_settings = { "options" : {
                    "wd": job_properties['params']['logdir']
                    }, 
                    "resources" : {}}

# load the default cluster config
cluster_config = load_cluster_config(CLUSTER_CONFIG)

add_custom_resources(cluster_config["__resources__"])

add_custom_options(cluster_config["__options__"])

# qsub default arguments
update_double_dict(qsub_settings, parse_qsub_settings(parse_qsub_defaults(QSUB_DEFAULTS)))

# cluster_config defaults
update_double_dict(qsub_settings, parse_qsub_settings(cluster_config["__default__"]))

# resources defined in the snakemake file (note that these must be integer)
# we pass an empty dictionary for option_mapping because options should not be
# specified in the snakemake file
resource=job_properties.get("resources", {})

update_double_dict(qsub_settings, parse_qsub_settings(job_properties.get("resources", {}), option_mapping={}))
#get any mem_mb specific options from job_properties, default 1000M
update_double_dict(qsub_settings, parse_qsub_settings({'mem_mb':job_properties.get('resources', {}).get('mem_mb', 1000)}))

#get any threads specific options from job_properties, default 1 thread
update_double_dict(qsub_settings, parse_qsub_settings({'threads':job_properties.get('threads', 1)}))

# get any rule specific options/resources from the default cluster config
update_double_dict(qsub_settings, parse_qsub_settings(cluster_config.get(job_properties.get("rule"), {})))

# get any options/resources specified through the --cluster-config command line argument
update_double_dict(qsub_settings, parse_qsub_settings(job_properties.get("cluster", {})))

# ensure qsub output dirs exist
for o in ("o", "e"):
    ensure_directory_exists(qsub_settings["options"][o]) if o in qsub_settings["options"] else None

# submit job and echo id back to Snakemake (must be the only stdout)
print(submit_job(jobscript, qsub_settings))
GrandH2O commented 2 years ago

I cautch the shell script , as folloing:

#!/bin/bash
# properties = {"type": "group", "groupid": "test", "local": false, "input": ["TEST2.TXT"], "output": ["TEST2.DONE"], "threads": 2, "resources": {"tmpdir": "/tmp"}, "jobid": "d3c6ba57-88a1-5c0c-b373-d8f3260f65bb", "cluster": {}}

# exit on first error
set -o errexit

 cd /beegfs/work/user/tan.yuntao/pipeline/test_wes_on_version/test_chr6 && \
/beegfs/work/user/rdd_admin/software/miniconda39/envs/snakemake/bin/python3.9 \
-m snakemake TEST2.DONE --snakefile /beegfs/work/user/tan.yuntao/pipeline/test_wes_on_version/test_chr6/test.smk \
--force --cores all --keep-target-files --keep-remote --max-inventory-time 0 \
--wait-for-files '/beegfs/work/user/tan.yuntao/pipeline/test_wes_on_version/test_chr6/.snakemake/tmp.o68xx4st' 'TEST2.TXT' --latency-wait 15 \
 --attempt 4  --scheduler ilp \
--wrapper-prefix https://github.com/snakemake/snakemake-wrappers/raw/ \
   --allowed-rules test2 --nocolor --notemp --no-hooks --nolock --scheduler-solver-path /beegfs/work/user/rdd_admin/software/miniconda39/envs/snakemake/bin \
--mode 2  --default-resources "tmpdir=system_tmpdir"  && exit 0 || exit 1

there is no params in properties.