lartpang / RunIt

A simple program scheduler for your code on different devices.
MIT License
11 stars 1 forks source link

stop the command but not kill all the processes #3

Closed BitCalSaul closed 1 week ago

BitCalSaul commented 4 months ago

I run Runit in tmux or zsh. I found even though I stop the Runit command by ctrl-c, the processes still are there and take the GPU memory with 0 util. I have to kill the processes one by one.

image

I'm wondering how to kill all the processes in a quick way.

lartpang commented 4 months ago

@BitCalSaul

Do you use the latest commit in the branch dev and the torchrun command?

If true, maybe you can try the following version. Here I have introduced signal to handle the KeyboardInterrupt signal.

# -*- coding: utf-8 -*-
# @Time    : 2021/3/6
# @Author  : Lart Pang
# @GitHub  : https://github.com/lartpang

import argparse
import os
import signal
import subprocess
import time
from enum import Enum
from multiprocessing import Manager, Pool, freeze_support
from queue import Queue

import yaml

class STATUS(Enum):
    WAITING = 0
    RUNNING = 1
    DONE = 2
    FAILED = 3

def worker(cmd: str, gpu_ids: str, queue: Queue, job_id: int, done_jobs: dict):
    job_identifier = f"[Job-{job_id}:GPU-{gpu_ids}]"

    # 设置子程序环境变量
    env = os.environ.copy()
    env["CUDA_VISIBLE_DEVICES"] = gpu_ids

    # subprocess.run(cmd, shell=True, check=True, env=env)
    with subprocess.Popen(cmd, shell=True, env=env) as sub_proc:
        # 使用subprocess.Popen代替subprocess.run
        try:
            print(f"{job_identifier} Executing {cmd}...")
            sub_proc.wait()
            done_jobs[job_id] = STATUS.DONE
        except Exception as e:
            print(f"{job_identifier} Command '{cmd}' failed: {e}")
            sub_proc.terminate()
            done_jobs[job_id] = STATUS.FAILED

    # 释放GPU资源回队列
    for gpu in gpu_ids.split(","):
        queue.put(gpu)
    print(f"{job_identifier} Release GPU {gpu_ids}...")

def get_args():
    # fmt: off
    parser = argparse.ArgumentParser()
    parser.add_argument("--gpu-pool", nargs="+", type=int, default=[0], help="The pool containing all ids of your gpu devices.")
    parser.add_argument("--max-workers", type=int, help="The max number of the workers.")
    parser.add_argument("--cmd-pool",type=str, required=True, help="The path of the yaml containing all cmds.")
    parser.add_argument("--interval-for-waiting-gpu",type=int, default=3, help="In seconds, the interval for waiting for a GPU to be available.")
    parser.add_argument("--interval-for-loop",type=int, default=1, help="In seconds, the interval for looping.")
    # fmt: on

    args = parser.parse_args()
    if args.max_workers is None:
        args.max_workers = len(args.gpu_pool)
    return args

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def main():
    args = get_args()
    print("[YOUR CONFIG]\n" + str(args))

    with open(args.cmd_pool, mode="r", encoding="utf-8") as f:
        jobs = yaml.safe_load(f)
    assert isinstance(jobs, (tuple, list)), jobs
    print("[YOUR CMDS]\n" + "\n\t".join([str(job) for job in jobs]))

    manager = Manager()
    # 创建一个跨进程共享的队列来统计空余的GPU资源
    available_gpus = manager.Queue()
    for i in args.gpu_pool:
        available_gpus.put(str(i))
    # 创建一个跨进程共享的dict来跟踪已完成的命令
    done_jobs = manager.dict()
    for job_id, job_info in enumerate(jobs):
        if job_info["num_gpus"] > len(args.gpu_pool):
            raise ValueError(f"The number of gpus in job {job_id} is larger than the number of available gpus.")
        done_jobs[job_id] = STATUS.WAITING

    # 在创建进程池之前注册信号处理器,以便在接收到中断信号时执行清理操作
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(processes=args.max_workers, initializer=init_worker)
    # 将原始的信号处理器恢复
    signal.signal(signal.SIGINT, original_sigint_handler)

    try:
        # 循环处理指令,直到所有指令都被处理
        while not all([status is STATUS.DONE for status in done_jobs.values()]):
            for job_id, job_info in enumerate(jobs):
                if done_jobs[job_id] in [STATUS.DONE, STATUS.RUNNING]:
                    continue
                # else: STATUS.WAITING, STATUS.FAILED

                # job_name = job_info["name"]
                command = job_info["command"]
                num_gpus = job_info["num_gpus"]

                num_avaliable_gpus = available_gpus.qsize()
                # 如果当前有足够的GPU资源,执行指令
                if num_gpus <= num_avaliable_gpus:
                    done_jobs[job_id] = STATUS.RUNNING
                    # 从队列中获取可用的GPU资源
                    gpu_ids = ",".join([available_gpus.get() for _ in range(num_gpus)])
                    # 执行给定的指令,并提供回调函数来更新完成的命令列表
                    pool.apply_async(worker, args=(command, gpu_ids, available_gpus, job_id, done_jobs))
                else:
                    # 如果GPU资源不足,跳过当前指令,稍后重试
                    print(f"Skipping '{command}', not enough GPUs available ({num_gpus} > {num_avaliable_gpus}).")
                    # 等待一段时间再次检查
                    time.sleep(args.interval_for_waiting_gpu)

            # 等待一段时间再次检查
            time.sleep(args.interval_for_loop)

        # 关闭进程池并等待所有任务完成
        pool.close()
    except KeyboardInterrupt:
        print("[CAUGHT KEYBOARDINTERRUPT, TERMINATING WORKERS!]")
        pool.terminate()
    finally:
        pool.join()
        manager.shutdown()
    print("[ALL COMMANDS HAVE BEEN COMPLETED!]")

if __name__ == "__main__":
    freeze_support()
    main()
BitCalSaul commented 4 months ago

Hi, i tried the branch dev and found it works good since now i could use ctrl+c to close all the processes related to jobs. However, i use omegaconf to set the command for each job. And there would be some new issue when I implement RunIt from dev with my framework. - name: job1 command: "python /home/jwq/Compressor/main.py epochs=300" num_gpus: 1 This works well. - name: job1 command: "python /home/jwq/Compressor/main.py epochs=300 field='["temperature"]'" num_gpus: 1 This not work. The output is shown below:

[YOUR CONFIG]
Namespace(cmd_pool='/home/jwq/RunIt/ProjCompressor/config.txt', gpu_pool=[0, 1, 2, 3, 4, 5, 6, 7], interval_for_loop=1, interval_for_waiting_gpu=3, max_workers=1)
Traceback (most recent call last):
  File "/home/jwq/RunIt/run_it.py", line 123, in <module>
    main()
  File "/home/jwq/RunIt/run_it.py", line 67, in main
    jobs = yaml.safe_load(f)
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/__init__.py", line 125, in safe_load
    return load(stream, SafeLoader)
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/__init__.py", line 81, in load
    return loader.get_single_data()
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/constructor.py", line 49, in get_single_data
    node = self.get_single_node()
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/composer.py", line 36, in get_single_node
    document = self.compose_document()
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/composer.py", line 55, in compose_document
    node = self.compose_node(None, None)
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/composer.py", line 82, in compose_node
    node = self.compose_sequence_node(anchor)
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/composer.py", line 111, in compose_sequence_node
    node.value.append(self.compose_node(node, index))
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/composer.py", line 84, in compose_node
    node = self.compose_mapping_node(anchor)
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/composer.py", line 127, in compose_mapping_node
    while not self.check_event(MappingEndEvent):
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/parser.py", line 98, in check_event
    self.current_event = self.state()
  File "/data/jwq/miniconda3/envs/compressor/lib/python3.8/site-packages/yaml/parser.py", line 438, in parse_block_mapping_key
    raise ParserError("while parsing a block mapping", self.marks[-1],
yaml.parser.ParserError: while parsing a block mapping
  in "/home/jwq/RunIt/ProjCompressor/config.txt", line 1, column 3
expected <block end>, but found '<scalar>'
  in "/home/jwq/RunIt/ProjCompressor/config.txt", line 2, column 69
BitCalSaul commented 4 months ago

I am also wondering if we could set up each job with just one line in config.txt. Since sometime i needs to run hundreds of tasks, it would much easier for me to generate these commands for jobs by copy and paste.

lartpang commented 4 months ago

@BitCalSaul

You can try this form:

- {name: job1, command: "python /home/jwq/Compressor/main.py epochs=300 field='[\"temperature\"]'", num_gpus: 1}

as in:

https://github.com/lartpang/RunIt/blob/8957feb01e718d45a1f45cc9d828a47bdc14c3e3/examples/config.yaml#L16-L17

BitCalSaul commented 4 months ago

Thx i will try it:) i am wondering the point of num_gpus. It seems like RunIt will hold a queue for jobs and once a job is done, the released gpus would be reassigned to the waiting jobs in the queue. Do i understand correct?

lartpang commented 4 months ago

@BitCalSaul

Yes.

Here's a simple flowchart that outlines the core process by GPT-4:

graph TD
    A[Start] --> B[Read Configuration and Command Pool]
    B --> C[Initialize Shared Resources]
    C --> |Maximum number of requirements met| D[Loop Until All Jobs Done]
    D --> E[Check Available GPUs]
    E -->|Enough GPUs| F[Run Job in Separate Process]
    E -->|Not Enough GPUs| G[Wait and Retry]
    F --> H[Job Completes]
    F --> I[Job Fails]
    H --> J[Update Job Status and Return GPUs]
    I --> J
    G --> D
    J -->|All Jobs Done| K[End]
    C -->|Maximum number of requirements not met| L[Terminate Workers]
    L --> M[Shutdown Manager and Join Pool]
    M --> K
lartpang commented 4 months ago

@BitCalSaul

Thanks for the positive feedback, I'll push the latest version to the main branch!

BitCalSaul commented 4 months ago

It seems amazing. Here are some small questions:

1: Could user use this tool to run a list of jobs even if the GPU pool cannot run them at the same time? Since I saw there are "wait and retry" and "return GPUs", it seems like RunIt will smartly run some job and then run the remaing jobs based on the utilization of GPU pool.

2: Could I give each job the same name? Considering I run hundreds of small jobs, it's inconvenient to rename them one by one.

3: When we say "Interrupted", does it mean the interruption by ctrl-c? And will it shut down all the running jobs and kill the remaining jobs ( or said, the Runit process), or just one job?

4: Will you consider adding up some GUI for RunIt just like nvitop, which I believe would be more fancy and easy for monitoring jobs.

Thank you for your contribution. It's really good tool considering its convenience for researchers :)

BitCalSaul commented 4 months ago

Sometimes i will run similar tasks which means i know how much memory each task will take. If all gpus are occupied but still leave some memory space that could run another task, there would be some waste. Thus im wondering if there is any threshold in RunIt to identify if a gpu is idle or busy, or the third state "could still work more".

lartpang commented 4 months ago

@BitCalSaul

1: Could user use this tool to run a list of jobs even if the GPU pool cannot run them at the same time? Since I saw there are "wait and retry" and "return GPUs", it seems like RunIt will smartly run some job and then run the remaing jobs based on the utilization of GPU pool.

Yes. This script determines whether the task can be run based on whether the number of GPUs currently available is greater than or equal to the number of GPUs needed for the task. So if there are enough GPUs, it is possible to run multiple tasks at the same time. Tasks that cannot be run at the moment will be executed at a later time when there are enough GPUs (e.g., when the previous task has finished running and freed up GPUs). It's worth noting that currently, the script is looping through each job in the order it's in the config file. This design ensures that GPU resources are fully utilized, but it can also lead to the fact that some tasks that take up too much GPU may be executed very late.

2: Could I give each job the same name? Considering I run hundreds of small jobs, it's inconvenient to rename them one by one.

Yes, name in the config yaml file is simply a marker used for human readability.

3: When we say "Interrupted", does it mean the interruption by ctrl-c? And will it shut down all the running jobs and kill the remaining jobs ( or said, the Runit process), or just one job?

(I tweaked the flowchart to make it more intuitive:)

Interrupted on the right of the flowchart refers to the following and the main process will be quited:

    done_jobs = manager.dict()
    for job_id, job_info in enumerate(jobs):
        if job_info["num_gpus"] > len(args.gpu_pool):
            raise ValueError(f"The number of gpus in job {job_id} is larger than the number of available gpus.")
        done_jobs[job_id] = STATUS.WAITING

4: Will you consider adding up some GUI for RunIt just like nvitop, which I believe would be more fancy and easy for monitoring jobs.

Thanks for the suggestion, the script itself came from a need in my own day-to-day research to perform a series of different deeplearning experiments. The strength of this script lies in its simplicity. It can be inserted into your project extremely easily.

A TUI interface similar to nvitop is really worth trying though. But it's a bit more complicated to consider what information to display, how to lay it out and interact with the user, and so on. Though with the help of GPT:> it probably wouldn't be too hard, but that would also take more time. Of course, your help and contribution would be very welcome. :>

lartpang commented 4 months ago

@BitCalSaul

Sometimes i will run similar tasks which means i know how much memory each task will take. If all gpus are occupied but still leave some memory space that could run another task, there would be some waste. Thus im wondering if there is any threshold in RunIt to identify if a gpu is idle or busy, or the third state "could still work more".

Currently just one task exists on just one GPU. If you want to use a threshold to determine whether a GPU is available for continued use by other tasks, this judgment process requires additional consideration of whether the GPU footprint of the existing running program has stabilized. There was such a feature in the early version of this script, but I removed it at a later stage because its judgment process was not flexible enough. But it's an important requirement that needs to be considered carefully.

BitCalSaul commented 4 months ago

yaml file is simp

Thanks for the reply. RunIt is based on python, right? I will see if I could understand the code and make some change when I get free. Thank you.

BitCalSaul commented 4 months ago

@BitCalSaul

Sometimes i will run similar tasks which means i know how much memory each task will take. If all gpus are occupied but still leave some memory space that could run another task, there would be some waste. Thus im wondering if there is any threshold in RunIt to identify if a gpu is idle or busy, or the third state "could still work more".

Currently just one task exists on just one GPU. If you want to use a threshold to determine whether a GPU is available for continued use by other tasks, this judgment process requires additional consideration of whether the GPU footprint of the existing running program has stabilized. There was such a feature in the early version of this script, but I removed it at a later stage because its judgment process was not flexible enough. But it's an important requirement that needs to be considered carefully.

Yes, for the version on commit , I usually run RunIt twice with two different configs since I know each of my task will only take 10% of one GPU memory, which could run 2X jobs on the same number of GPUs. For the users who are not familiar or the jobs that the memory may change during training, the way at this moment is the saftest. For the users who know the workload of the jobs and the memory during training may not change, the threshold may help.

lartpang commented 4 months ago

Yes, there is room for improvement in this tool.

BitCalSaul commented 1 week ago

@lartpang Hi hope it finds you well! Now im running multiple small tasks with my gpus. Since my task is stable and i know the memory they need, to achieve the most efficiency, i usually run runit twice. Im wondering if you recently add the module to let one GPU run multiple tasks simultaneously. Thx.

lartpang commented 1 week ago

Hi, @BitCalSaul

Thanks for your idea. I'll try to achieve this requirement in the next couple of days.

lartpang commented 1 week ago

Hi, @BitCalSaul I have committed a new script runit_based_on_memory.py. Could you see if the script does what you need now? Be aware that with the current update, you need to configure your GPU information by modifying the config file.

BitCalSaul commented 1 week ago

Hi, @BitCalSaul I have committed a new script runit_based_on_memory.py. Could you see if the script does what you need now? Be aware that with the current update, you need to configure your GPU information by modifying the config file.

Thx, i will have a try. BTW, there may be a typo on your readme, which should be runit_based_on_memory.py.

I have a question regarding the two arguments, "interval-for-waiting-gpu" and "interval-for-loop."

For the first argument, I understand it as the time interval between consecutive checks to determine if a specific GPU has available memory exceeding the set memory threshold.

For the second argument, does it refer to the time interval between consecutive jobs? I'm not entirely clear on its purpose. Could you please clarify?

BitCalSaul commented 1 week ago

I have finished the test, thanks for your new feature. Now I could change the config to make full use of my GPUs.

I have two following questions:

  1. It seems like the code has an estimation of GPU memory instead of directly reading the memory from Nvidia drivers. It means if someone is using GPU 1 with 30% memory, I need to change my config about that GPU with (100-30)=70% memory, or my program may encounter the out-of-memory issue.
  2. It seems "ctrl+c" for RunIt cannot kill the job processes intiated by Runit.
lartpang commented 1 week ago

@BitCalSaul

For the first question, we can use the nvidia-ml-py (https://pypi.org/project/nvidia-ml-py/). But for the second question, there is no good way to solve this problem at the moment. I've tried to catch the exception as much as possible. :< But maybe it's because I'm not very skilled at developing multi-process programs, and some of the details may not be handled very well.

BitCalSaul commented 1 week ago

Hi @lartpang Thank you for your reply. I remember you solved the second question before, please check here https://github.com/lartpang/RunIt/issues/3#issuecomment-2053291127. Is this situation different from now?

I am frequently using the code from commit 056b1cfd5aa1cf2c4beea192868c5c02463ce6d3 because it fits my needs well. However, I've encountered a small issue: even when specifying the gpu-pool argument, Runit still occasionally runs my jobs on GPUs that I haven't assigned to the pool. This is problematic because sometimes my labmate runs code with dynamic memory usage, so even if Runit checks the GPU and sees enough memory available at the start, the situation can change quickly, leading to out-of-memory errors. Could you please take a look into this?

lartpang commented 1 week ago

@BitCalSaul In the latest commit, the new script has also used these strategy :>...

BitCalSaul commented 1 week ago

@BitCalSaul In the latest commit, the new script has also used these strategy :>...

I didn't get the point, what kind of strategy? I thought the gpu-pool argument would have worked but not.

lartpang commented 1 week ago

@BitCalSaul

It is about this:

I remember you solved the second question before, please check here https://github.com/lartpang/RunIt/issues/3#issuecomment-2053291127. Is this situation different from now?


However, I've encountered a small issue: even when specifying the gpu-pool argument, Runit still occasionally runs my jobs on GPUs that I haven't assigned to the pool.

About this, em..., maybe you can try the new script in the latest commit: https://github.com/lartpang/RunIt/blob/727543d503f6ccab5766947817a100d951b90795/runit_based_on_detected_memory.py

BitCalSaul commented 1 week ago

Thanks for your clarification!

BitCalSaul commented 1 week ago

Hi, thanks for your new features. RunIt is smart since it could detect the real-time memory and now my GPUs could be fully used :) That's what I exactly want!

BTW, I manually kill my processes on GPU since "ctrl+c" won't work. I am wondering if you have any recommended shortcut to kill processes on GPU for a specific user. I have found similar questions but not work.