facebookincubator / submitit

Python 3.8+ toolbox for submitting jobs to Slurm
MIT License
1.16k stars 115 forks source link

Command for after srun executes #13

Open jgbos opened 3 years ago

jgbos commented 3 years ago

I utilize the setup parameter now to include running commands in the sbatch script before running srun, but is it possible to add commands to be executed after srun?

jrapin commented 3 years ago

It's not possible for now but should be easy enough to add if need be. What's your use case though? In case it can be dealt with in any other way

jgbos commented 3 years ago

We are adding logging scripts to monitor load, memory, gpu stats, etc via dstat and dcgmi. I need to start the logging before srun and then shut them down afterwords.

jgbos commented 3 years ago

FYI, I wrote a wrapper around my cmd to run my commands in a subprocess before and after the actual command on the rank zero node. This seems to be working.

jrapin commented 3 years ago

Ok, more questions: Would you need it for all executors or for slurm only? (setup is slurm only) Could that be performed in Python or does this need to ba in commandlines? I am thinking of some kind decorator/context manager

jrapin commented 3 years ago

typically ContextDecorator would seem to be a good fit. What do your logs look like? If you have something robust maybe we can include it as a helper in submitit eventually

jgbos commented 3 years ago

So these logging scripts are not slurm specific, but I only have access to slurm right now. I like the decorator idea. Here is the class I made to wrap my cmd function

class JobMonitor(object):
    """JobMonitor will monitor memory, cpu, and gpu states.

    The monitors must be
    """
    def __init__(self, fn):
        self.fn = fn

    def __call__(self, *args, **kwargs):
        import os
        rank_zero = False
        if 'SLURM_PROCID' in os.environ:
            rank = int(os.getenv('SLURM_PROCID'))
            if rank == 0:
                rank_zero = True

        if rank_zero:
            self.before_srun()

        output = self.fn(*args, **kwargs)

        if rank_zero:
            self.post_srun()

        return output

    def before_srun(self):
        import subprocess
        subprocess.Popen(['dstat', '-cdlmnr', '--output', 'monitor_dstat_logfile.csv'])
        subprocess.Popen(['dcgmi', 'group', '-c', 'allgpus', '--default'])
        subprocess.Popen(['dcgmi', 'stats', '-g', '2', '-e'])
        subprocess.Popen(['dcgmi', 'stats', '-g', '2', '-s', '$SLURM_JOBID'])

    def post_srun(self):
        import subprocess
        subprocess.Popen(['dcgmi', 'stats', '-x', '$SLURM_JOBID', '-v'])
        with open('monitor_gpu_logfile', 'w') as outfile:
            subprocess.run(['dcgmi', 'stats', '-j', '$SLURM_JOBID', '-v'], stdout=outfile)
        subprocess.run(['kill', '$(jobs -pr)'])
jrapin commented 3 years ago

This looks like it can most definitely be framed as a contextmanager decorator indeed. How standard are dstat and dcgmi? I am not familiar with this. I heard about python plugins that may be able to do the job as well (although it may not necessarily be simpler since we need to spawn processes can you provide a sample of an output?

jgbos commented 3 years ago

I'm not too familiar with the commands as I'm was asked to include these logs for someone else interested in them. I think dstat can be installed on any linux machine and dcgmi is an nvidia product. The dstat output is a text file of essentially a top like output with CPU and Memory usage over time. Similarly for dcgmi, but for gpus. I'm swamped with some other stuff, but I'll try to get you a sample output soon.

jrapin commented 3 years ago

I've given it a try. If this works for you so far I think it's a good option for now. I have a few minor concerns for border cases if we want to make it work at a bigger scale:

Below is the code I am using to test, it does not work perfectly and a bit too verbose (probably because I'm trying to avoid Popen). I'll make more tests later if you are not in any hurry now.

  import subprocess
  import submitit

  class with_stats:

      def __init__(self, func):
          self._func = func

      def __call__(self, *args, **kwargs):
          with self:
              output = self._func(*args, **kwargs)
          return output

      def checkpoint(self, *args, **kwargs):
          if not hasattr(self._func, "checkpoint"):
              return None
          output = self._func.checkpoint(*args, **kwargs)
          output.function = self.__class__(function)  # ugly hack
          return output

      def __enter__(self):
          print('Starting')
          env = submitit.JobEnvironment()
          if not env.global_rank:
              self.process = subprocess.Popen(['dstat', '-cdlmnr', '--output', 'monitor_dstat_logfile.csv'])
              subprocess.run(['dcgmi', 'group', '-c', 'allgpus', '--default'])
              subprocess.run(['dcgmi', 'stats', '-g', '2', '-e'])
              subprocess.run(['dcgmi', 'stats', '-g', '2', '-s', str(env.job_id)])
          return self

      def __exit__(self, *exc):
          env = submitit.JobEnvironment()
          if not env.global_rank:
              subprocess.run(['dcgmi', 'stats', '-x', str(env.job_id), '-v'])
              with open('monitor_gpu_logfile', 'w') as outfile:
                  subprocess.run(['dcgmi', 'stats', '-j', str(env.job_id), '-v'], stdout=outfile)
              self.process.terminate()
          print('Finishing')
          return False
gwenzek commented 3 years ago

is it possible to add commands to be executed after srun?

Given the rest of the discussion this is not needed anymore ? I don't think I see a good usecase for this. Any command can be run from python, so you should be able to run you commands inside the submitted function.

def run():
  env = submitit.JobEnvironment()
  with dstat('monitor_dstat_logfile.csv'):
      dcgmi(env.job_id)
      return train_model()

As outlined by Jérémy the issue is that it is a bit verbose to wrap an existing Checkpointable. Maybe the Checkpointable should have more hooks on_startup, on_restart, on_timeout, on_job_done ... but this begin to be very frameworkish.

gwenzek commented 3 years ago

with or without context manager, I'm a bit concerned by what happens in case of cancellation/timeout/preemption, since the "post" method may not be called.

I think it works fine, at least with a LocalExecutor, haven't confirmed on SLURM yet.

def test_cleanup_is_done_on_kill(tmp_path: Path) -> None:
    def touch_then_cleanup(tmp_file: Path):
        tmp_file.touch()
        try:
            time.sleep(10)
        finally:
            tmp_file.unlink()

    executor = local.LocalExecutor(tmp_path)

    job = executor.submit(touch_then_cleanup, tmp_path / "cancel.txt")
    time.sleep(1)
    assert (tmp_path / "cancel.txt").exists()
    job.cancel()
    time.sleep(1)
    assert not (tmp_path / "cancel.txt").exists()

    job = executor.submit(touch_then_cleanup, tmp_path / "timeout.txt")
    time.sleep(1)
    assert (tmp_path / "timeout.txt").exists()
    job._interrupt(timeout=True)  # type: ignore
    time.sleep(1)
    assert not (tmp_path / "timeout.txt").exists()

    job = executor.submit(touch_then_cleanup, tmp_path / "interrupt.txt")
    time.sleep(1)
    assert (tmp_path / "interrupt.txt").exists()
    job._interrupt()  # type: ignore
    time.sleep(1)
    assert not (tmp_path / "interrupt.txt").exists()
gwenzek commented 3 years ago

the decorator needs to forward the checkpoint method of the callable if there is one, to handle preemption. Right now, that requires a ugly hack :s

I don't think it's ugly, it's looks like regular Monad stuff for me: checkpoint(with_stats(f)) := with_stats(checkpoint(f))

jrapin commented 3 years ago

I think it works fine, at least with a LocalExecutor, haven't confirmed on SLURM yet.

I've seen weird stuff on slurm, seemed to work OKish for preemption/timeout, although I'm a bit concerned that the spawned process does not get killed directly and my delay the completion, and it probably does not for cancellation (then again, handling cancellation may not be a good idea)

I don't think it's ugly

OK maybe it's alright (not familiar with monad stuff though), then again it may require some helpers to ease development.

Given the rest of the discussion this is not needed anymore ?

In my opinion it's much more generic with decorators so probably not needed anymore, though

Maybe the Checkpointable should have more hooks

I'm wondering about it, or to have another objects for setup/teardown. That may (or may not) be useful for copying the directory when submitting a function... I'm wondering if we could make some decorator-like framework work for this as well since it is a major concern of many users

LTMeyer commented 3 years ago

Dear all, First, let me thank you very much for your work. I am posting here my issue because it is related to the current one, hence it seems the most appropriate place.

I am using submitit through the hydra plugin to run deep learning experiment on a cluster. The nodes of the cluster do not have access to the internet for safety matters. I would like to upload a posteriori logs of my experiments to a platform like wandb, from other specific nodes that can actually connect to external servers. A possible solution relying only on slurm would be to execute a bash script containing multiple sbatch commands specifying the dependency between jobs via the --dependency:afterwork option. How can I achieve the same using submitit and the associated plugin in hydra?