dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
235 stars 143 forks source link

SGECluster in multiple queues #378

Open tiagofrepereira2012 opened 4 years ago

tiagofrepereira2012 commented 4 years ago

Hi,

I'm opening this issue here, because I'm not having much success with stack overflow (https://stackoverflow.com/questions/59391000/sgecluster-in-multiple-queues). Sorry for the noise.

I'm using dask.distributed to launch jobs on a SGE cluster (https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SGECluster.html#dask_jobqueue.SGECluster) via dask.bags and/or dask.delayed.

Everything works nicely. However, I may have some dask.delayed that should run in a specific queue on my SGE cluster (due to GPU availability for instance (or some other requirement)). How can I work this out on dask?

In other words, how can I define a dask_jobqueue.SGECluster with multiple queues and/or different resource specs? dask_jobqueue.SGECluster allows me to configure one cluster setup only (one queue, one resource spec, etc..).

Thanks

lesteve commented 4 years ago

Having heterogeneous clusters was discussed in the past, for example https://github.com/dask/distributed/issues/2118. AFAIK there is no easy way at the moment. I am going to jot down a few partial notes which are not going to really answer your question but hopefully may help a bit.

My understanding is that the SpecCluster refactoring present in dask-jobqueue 0.7.0 could help in principle but I don't understand it well enough to be 100% sure. You could have a look at SpecCluster.

Dask resources will be useful to "tag" certain workers with their type, i.e. GPU vs CPU and tell which part of your graph needs to use which resources.

Something you could try rather than using dask-jobqueue is to create your Dask cluster a bit more manually following: https://docs.dask.org/en/latest/setup/hpc.html#using-a-shared-network-file-system-and-a-job-scheduler

If you get something working to solve your problem, it would be really nice if you post a message explaining your solution !

tiagofrepereira2012 commented 4 years ago

Hi @lesteve,

First, sorry for the long message. Second, I made it work on my SGE cluster, but I needed to do workarounds.

Well, the problem is two fold. The first one, is to be able to make the job_queue submit two workers with different specifications. The second one is to be able to do task assignments to the particular jobs.

To approach 1, I extended the JobQueueCluster in a such way that I can call the method scale with different specs and I can tag them. The class bellow allows me to do:

cluster = SGEIdiapCluster()
cluster.scale(n_jobs=1, queue="q_1day") # one spec
cluster.scale(n_jobs=1, queue="q_gpu", resources="GPU=1")# another spec
class SGEIdiapCluster(JobQueueCluster):
    """ Launch Dask jobs in the IDIAP SGE cluster

    """

    def __init__(self, **kwargs):

        # Defining the job launcher
        self.job_cls = SGEIdiapJob

        # we could use self.workers to could the workers
        # However, this variable works as async, hence we can't bootstrap
        # several cluster.scale at once
        self.n_workers_sync = 0 

        # Hard-coding some scheduler info from the time being
        self.protocol = "tcp://"
        silence_logs = "error"
        dashboard_address = ":8787"
        secutity = None
        interface = None
        host = None
        security = None

        scheduler = {
            "cls": Scheduler,  # Use local scheduler for now
            "options": {
                "protocol": self.protocol,
                "interface": interface,
                "host": host,
                "dashboard_address": dashboard_address,
                "security": security,
            },
        }

        # Spec cluster parameters
        loop = None
        asynchronous = False
        name = None

        # Starting the SpecCluster constructor        
        super(JobQueueCluster, self).__init__(
            scheduler=scheduler,
            worker={},
            loop=loop,
            silence_logs=silence_logs,
            asynchronous=asynchronous,
            name=name,
        )

    def scale(self, n_jobs, queue=None, memory="4GB", io_big=True, resources=None):
        """
        Launch an SGE job in the Idiap SGE cluster

        Parameters
        ----------

          n_jobs: int
            Number of jobs to be launched

          queue: str
            Name of the SGE queue

          io_big: bool
            Use the io_big? Note that this is only true for q_1day, q1week, q_1day_mth, q_1week_mth

          resources: str
            Tag your workers with meaningful name (e.g GPU=1). In this way, it's possible to redirect certain tasks to certain workers.

        """

        if n_jobs == 0:
            # Shutting down all workers
            return super(JobQueueCluster, self).scale(0, memory=None, cores=0)

        resource_spec = f"{queue}=TRUE"  # We have to set this at Idiap for some reason
        resource_spec += ",io_big=TRUE" if io_big else ""
        log_directory = "./logs"
        n_cores = 1
        worker_spec = {
            "cls": self.job_cls,
            "options": {
                "queue": queue,
                "memory": memory,
                "cores": n_cores,
                "processes": n_cores,
                "log_directory": log_directory,
                "local_directory": log_directory,
                "resource_spec": resource_spec,
                "interface": None,
                "protocol": self.protocol,
                "security": None,
                "resources": resources,
            },
        }

        # Defining a new worker_spec with some SGE characteristics
        self.new_spec = worker_spec

        # Launching the jobs according to the new worker_spec
        n_workers = self.n_workers_sync
        self.n_workers_sync += n_jobs
        return super(JobQueueCluster, self).scale(
            n_workers + n_jobs, memory=None, cores=n_cores

To solve issue 2 (assign specific tasks to specific workers), it's necessary to be able to pass some tag to the argument --resources in python -m distributed.cli.dask_worker script. To do that, and this IS NOT a clean solution, I added an argument called resource as kwarg passed to the dask_jobqueue.core.Job. In this way, I can set the option --resource Follow bellow the code that does that.

class SGEIdiapJob(Job):
    """
    Launches a SGE Job in the IDIAP cluster.
    This class basically encodes the CLI command that bootstrap the worker 
    in a SGE job. Check here `https://distributed.dask.org/en/latest/resources.html#worker-resources` for more information

    ..note: This is class is temporary. It's basically a copy from SGEJob from dask_jobqueue.
            The difference is that here I'm also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here `https://github.com/dask/dask-jobqueue/issues/378` to get news about this patch

    """
    submit_command = "qsub"
    cancel_command = "qdel"

    def __init__(
        self,
        *args,
        queue=None,
        project=None,
        resource_spec=None,
        walltime=None,
        job_extra=None,
        config_name="sge",
        **kwargs
    ):
        if queue is None:
            queue = dask.config.get("jobqueue.%s.queue" % config_name)
        if project is None:
            project = dask.config.get("jobqueue.%s.project" % config_name)
        if resource_spec is None:
            resource_spec = dask.config.get("jobqueue.%s.resource-spec" % config_name)
        if walltime is None:
            walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
        if job_extra is None:
            job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)

        super().__init__(*args, config_name=config_name, **kwargs)

        # Amending the --resources in the `distributed.cli.dask_worker` CLI command
        if "resources" in kwargs and kwargs["resources"]:
            resources = kwargs["resources"]
            self._command_template += f" --resources {resources}"

        header_lines = []
        if self.job_name is not None:
            header_lines.append("#$ -N %(job-name)s")
        if queue is not None:
            header_lines.append("#$ -q %(queue)s")
        if project is not None:
            header_lines.append("#$ -P %(project)s")
        if resource_spec is not None:
            header_lines.append("#$ -l %(resource_spec)s")
        if walltime is not None:
            header_lines.append("#$ -l h_rt=%(walltime)s")
        if self.log_directory is not None:
            header_lines.append("#$ -e %(log_directory)s/")
            header_lines.append("#$ -o %(log_directory)s/")
        header_lines.extend(["#$ -cwd", "#$ -j y"])
        header_lines.extend(["#$ %s" % arg for arg in job_extra])
        header_template = "\n".join(header_lines)

        config = {
            "job-name": self.job_name,
            "queue": queue,
            "project": project,
            "processes": self.worker_processes,
            "walltime": walltime,
            "resource_spec": resource_spec,
            "log_directory": self.log_directory,
        }
        self.job_header = header_template % config
        logger.debug("Job script: \n %s" % self.job_script())

With this I was able to do the task placement proposed here https://distributed.dask.org/en/latest/resources.html#resources-with-collections.

I think we should patch dask_jobqueue.core.Job to allow us to set the resource option. What do you think?

lesteve commented 4 years ago

Good to see that you found a work-around that works for you!

There is a related discussion in #381. I investigated the SpecCluster way a bit and added some comments in https://github.com/dask/dask-jobqueue/issues/381#issuecomment-584147406.

My original idea would be to keep scale simple and having a "composite" job object that behind the scenes can launch multiple jobs in separate queues. This would mean that you can not scale the number of jobs per queue separately but rather the total number of jobs. In other words the ratio of per-queue number of jobs would stay fixed in this model. Would that be a problem for you?

I think we should patch dask_jobqueue.core.Job to allow us to set the resource option. What do you think?

Dask resources can be passed via the extra argument but maybe I am missing something?