riga / law

Build large-scale task workflows: luigi + job submission + remote targets + environment sandboxing using Docker/Singularity
http://law.readthedocs.io
BSD 3-Clause "New" or "Revised" License
96 stars 39 forks source link

Remote SLURM job submission #157

Closed tgy closed 1 year ago

tgy commented 1 year ago

In law/contrib/slurm/, we assume that the machine from which we submit the jobs has direct access to SLURM and its associated commands such as sbatch.

In many cases, we want to submit SLURM jobs to a remote SLURM cluster. This means that we have to first SSH to the remote machine, and submit a job there, using the command already created by law.

What would be a way to extend law to add the possibility to send the job file through scp and execute the submission command over SSH?

Thanks a lot!

riga commented 1 year ago

Hey @tgy!

Thanks for your interest in law :)

What would be a way to extend law to add the possibility to send the job file through scp and execute the submission command over SSH?

I had something like this in mind at the beginning when I looked into the different job submission interfaces, but I came to the conclusion that such a feature would entail a bunch of complications that I didn't want to deal with (at least so far... let's see).

For instance, on our SLURM cluster,

just to name two things.

The LOCAL NODE -> SSH NODE -> SLURM NODE submission method that you suggested would have to deal with that, i.e., potentially transport files LOCAL -> SSH and SSH -> LOCAL once jobs finish, and account for possibly diverging environments between LOCAL NODE and SSH NODE.

Just one example: remote jobs require law as well, as they trigger tasks the same way that you would do locally; for this to work via SLURM over SSH you would need to make sure that law is available on the SSH NODE.

I think there is no way around having to manually set up things ({env,software,code}) on the SSH node first, but with control over every aspect. Then, one either connects manually to the node and law run's things (I agree, not very automated), or we think of a method to have one law installation on LOCAL NODE talk to another installation on SSH NODE to trigger tasks there.

tgy commented 1 year ago

Thanks @riga for your detailed answer. You perfectly described my problem. Also, I'm really impressed by this piece of work you managed to put together.

To describe my problem, here's a little drawing

swappy-20230824-111002

I'm in a bit of a complicated situation where I'm trying to build a small MLOps system for ML medical research at a Parisian hospital, in collaboration with another Lab.

Lab has its own private network, to which I could SSH from my main Internet-connected orchestrator (where the main luigi scheduler should remain, and where luigid will be running) through a VPN. But I'd rather have a process running on the Internet-connected Lab machine, that's gonna pull jobs to be executed from the orchestrator (because it's painful to make the orchestrator talk to the Lab node due to the private network thing).

At the hospital where data resides, it's even worse: there's no VPN whatsoever. So I need to set up a system such that the Hospital nodes are going to pull jobs that need to be executed from the orchestrator.

Note that I can set up any environment I need (with law, of course) on either the Hospital nodes or the Lab node (which has direct access to the SLURM commands). Note as well that both the Hospital and Lab nodes are connected to the Internet, so I can work in one direction at least.

Now I'm trying to find the best tool for my needs.

This idea of having several law installations running on several machines and communicating with each other is really interesting, and sounds like what I would need in this case: law_Hospital, law_Orchestrator, law_Lab would be able to communicate to schedule jobs and pass on results and task/job states.

I'm trying to understand if law is the tool for my situation. I'm also considering using Prefect. But I like luigi and I like how law is conceptualised. But if setting this up with law requires too much work I'll have to use something else I think :/ What are your thoughts?

Thanks a lot for your time.

riga commented 1 year ago

Interesting!

If creating multiple (compatible) setups is not an issue for you, then I think that luigi assistant worker feature is exactly what you are looking for.

The only problem is that the feature has never been properly documented as it has always been considered an experimental feature (old question, GH search), but looking at the code again I think it should work out of the box for you.

  1. You start a central scheduler server on your orchestrator, including a "resource" configuration with, say, slurm = 0.
  2. You start a task on your local machine that would actually submit slurm jobs. However, this task needs to identify whether it would have access to slurm itself. If not, it sets its resources(*) to {"slurm": 1}. Since the scheduler has 0 resources, it won't assign the task the your local worker, so no local slurm submission takes place.
  3. You have a permanent worker running on your "ssh machine with slurm", with the --assistant parameter enabled. When running (it has --worker-keep-alive enabled by default, so it can always run in the background) and talking to the same scheduler, the scheduler will try to assign the task to the assistant worker since it has to reject your local worker (due to the resource mismatch). Your ssh worker has access to slurm, so the resource of the task on that machine is {"slurm": 0}, and the task will actually run, submit jobs, etc.

I haven't used the assistant feature so far, and I'd have a couple questions myself (e.g., how to start the assistant worker? do you have to run the exact same luigi / law run command, just with --assistant in addition? (I actually don't think so)). But reading the doc strings in the implementation, I think the initial luigi devs had scenarios in mind that are extremely similar to yours. In case this is true and adjustments in law could make the assistant feature easy to use, I'm happy to help.


(*) You can define per-task resources via

class MyTask(luigi.Task):

    def process_resources(self):
        has_access_to_slurm = ...  # TODO
        return {"slurm": 0 if has_access_to_slurm else 1}
tgy commented 1 year ago

Thanks a lot for your answer. I stumbled upon this "assistant" concept in luigi several times but had no idea what it meant.

From my research, it seems possible to run luigi in --assistant mode by passing a dummy Task (see here)

Thanks for the resource idea. This solves exactly one problem I had in mind: how to make sure that tasks would be executed within the right environment. Another example of this is the tasks that process patient data: they need to be executed within a Hospital node, where data is available (and should remain, obviously).

Maybe we can implement some sort of law "assistant"? One that just waits for jobs sent by the central scheduler.

Just one thing that's a bit unclear to me: I guess the central scheduler and the workers (on whatever node they're running) must somehow share the same definition of tasks, workflows, jobs. How is that done exactly? What if I have a worker running on Lab node for 2 weeks, consuming tasks, and the code has changed in the mean time, with new Tasks, maybe some Parameters added to some Task etc.? How is that solved exactly? Should I just have a git repository that's constantly maintained at the same version across all workers?

tgy commented 1 year ago

I also learned that if I want to schedule some tasks from my laptop (on which no task should actually be executed) I simply need to pass --workers 0. Learning this in the mailing list instead of the official documentation :confused:

tgy commented 1 year ago

@riga We've decided to go with luigi+law!

Our needs are probably a bit different than you physics people at CERN!

I'll keep you posted, and we'll contribute to the law library as we go (if our changes make sense). Especially, adding documentation and examples of usage.

I'll probably have questions to ask, because there are still many things that I don't understand yet about law and luigi.

But regarding the initial issue here, this seems to be solved using this "assistant" worker mode. Note however that passing --assistant is not sufficient: I had to also pass --worker-keep-alive, otherwise the worker would simply stop after executing the empty Task.

riga commented 1 year ago

Glad to hear! Feel free to ask questions any time!

tgy commented 11 months ago

Hi @riga,

Just so you know, I've created some sort of agent called luigic that's running in assistant mode, waiting for tasks to arrive, and updating its code base from a git repository as it runs (hot reloading).

Whenever a new commit hash arrives on the git remote, the luigi.Worker is stopped (if it has no running task), and restarted (with the newly loaded version of the code). Not sure this is actually necessary but my guess is that for for the new code to be passed to the worker, its thread needs to be stopped and restarted (maybe not true).

This is a preliminary implementation. I'm still testing if this really works well.

Given your extensive knowledge of luigi, would you mind telling me if you think this approach makes sense? Would love your input.

Here's an excerpt of the code:

import asyncio
import signal
import socket
import threading
from pathlib import Path

import git
import luigi.rpc
import luigi.worker
from loguru import logger

import mylib
from mylib.pipeline.config import C
from mylib.pipeline.util import deepreload
from mylib.pipeline.util.log import setup_loguru_interception

class Worker(luigi.worker.Worker):
    def __init__(self, scheduler, worker_processes):
        super().__init__(
            scheduler=scheduler,
            worker_processes=worker_processes,
            assistant=True,
            keep_alive=True,
        )
        self.should_run = True
        self.run_thread = None

    def run_in_thread(self):
        if not self.should_run:
            raise ValueError("cannot run worker that should not be running")
        if self.run_thread is not None:
            raise RuntimeError(
                "cannot start worker that is already running in a thread"
            )
        self.run_thread = threading.Thread(target=self.run)
        self.run_thread.start()

    def stop_thread(self):
        if self.run_thread is None:
            logger.warning("attempting to stop a worker thread but it is not running")
            return
        self.should_run = False
        self._start_phasing_out()
        self.run_thread.join()
        self.run_thread = None

class WorkerSchedulerFactory:
    def __init__(
        self,
        url: str,
        worker_processes: int,
    ) -> None:
        self.scheduler = self.create_remote_scheduler(url)
        self.worker_processes = worker_processes

    def create_remote_scheduler(self, url: str) -> luigi.rpc.RemoteScheduler:
        return luigi.rpc.RemoteScheduler(url)

    def create_worker(self) -> Worker:
        return Worker(
            scheduler=self.scheduler,
            worker_processes=self.worker_processes,
        )

def _get_ssh_command() -> str:
    git_ssh_identity_file = C.path_to_git_deploy_private_key
    git_ssh_cmd = f"ssh -i {git_ssh_identity_file}"
    return git_ssh_cmd

def _debug_clone_repo(repo_path: Path) -> None:
    git.Repo.clone_from(
        C.mylib_repo_url,
        repo_path,
        branch="main",
        env={"GIT_SSH_COMMAND": _get_ssh_command()},
    )

def _update_code_from_git(repo_local_path: Path, branch_name: str) -> bool:
    code_updated = False
    repo = git.Repo(repo_local_path)
    cur_head_hash = repo.head.commit.hexsha
    logger.debug(
        f"fetching origin/{branch_name} for repo at {repo_local_path}. "
        f"current HEAD hash is {cur_head_hash}"
    )
    with repo.git.custom_environment(GIT_SSH_COMMAND=_get_ssh_command()):
        repo.remotes.origin.fetch(branch_name)
    new_hash = repo.remotes.origin.refs[branch_name].commit.hexsha
    if cur_head_hash != new_hash:
        logger.info(
            f"new hash {new_hash} found for origin/{branch_name}. "
            "code will be hard reset and hot reloaded"
        )
        repo.head.reset(new_hash, hard=True, index=True, working_tree=True)
        code_updated = True
    else:
        logger.debug("hash did not change. not doing anything")
    return code_updated

async def periodically_update_code_from_git(
    repo_local_path: Path,
    branch_name: str,
    code_reload_event: asyncio.Event,
    frequency_sec: float | int,
) -> None:
    try:
        while True:
            logger.debug("fetching code from git")
            loop = asyncio.get_event_loop()
            code_updated = await loop.run_in_executor(
                None,
                _update_code_from_git,
                repo_local_path,
                branch_name,
            )
            if code_updated:
                code_reload_event.set()
            await asyncio.sleep(frequency_sec)
    except asyncio.CancelledError:
        logger.debug("stopped periodic code update from git")
        raise

async def get_shit_done(
    code_reload_event: asyncio.Event,
    deep_reload_module: str = "mylib.pipeline",
    allow_reload: str = "mylib",
):
    logger.debug("setting up luigic main loop. getting shit done")
    env_params = luigi.interface.core()
    kill_signal = signal.SIGUSR1 if env_params.take_lock else None
    if not env_params.no_lock and not (
        luigi.lock.acquire_for(
            env_params.lock_pid_dir,
            env_params.lock_size,
            kill_signal,
        )
    ):
        raise luigi.interface.PidLockAlreadyTakenExit()
    url = C.luigi_scheduler_url
    factory = WorkerSchedulerFactory(url=url, worker_processes=1)
    logger.debug("starting worker in separate thread")
    try:
        while True:
            worker = factory.create_worker()
            worker.run_in_thread()
            logger.info("starting code reload loop")
            await code_reload_event.wait()
            logger.info("code reload event received")
            worker.stop_thread()
            logger.info("worker stopped. will deep reload mylib")
            deepreload.reload(deep_reload_module, allow_reload=allow_reload)
            code_reload_event.clear()
        worker.stop_thread()
    except asyncio.CancelledError:
        logger.info("task cancelled")
        worker.stop_thread()
        raise

async def main():
    setup_loguru_interception()
    repo_local_path = Path(mylib.__path__[0]).parent
    if not (repo_local_path / ".git").is_dir():
        raise RuntimeError(f"could not find git repo in {repo_local_path}")
    branch_name = "main"
    logger.debug(f"using repo located at {repo_local_path} and branch {branch_name}")
    logger.debug("starting luigic")
    code_reload_event = asyncio.Event()
    async with asyncio.TaskGroup() as tg:
        tg.create_task(
            periodically_update_code_from_git(
                repo_local_path,
                branch_name,
                code_reload_event,
                frequency_sec=5,
            )
        )
        tg.create_task(get_shit_done(code_reload_event))
riga commented 11 months ago

Thanks for the follow-up, that's interesting!

From the luigi side I think this looks very good!