ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.62k stars 5.71k forks source link

[Feature] Feedback on Ray Job API #22043

Closed msaroufim closed 2 years ago

msaroufim commented 2 years ago

Hi everyone,

I've been using the Ray Job API and ray up as part of my work build a Ray scheduler for torchX

I will preface the below by saying I think the Ray Job API is a great feature but I think it needs a bit more work to make it as great to use as the rest of Ray.

I had some feedback to improve its usability and one major limitation I wanted to point out that @amogkam @jiaodong suggested I share.

Major limitation

Right now it seems like working_dir has a 100MB limit which is problematic since it means I don't have a clean story to load training data, binaries and extra files like label key value stores. This limits how "real" of a usecase this feature can support because even introductory Kaggle problems have datasets larger than 100MB.

Minor limitations and suggested improvements

amogkam commented 2 years ago

Thanks for the thorough details @msaroufim! This is great feedback for us. Also cc @edoakes @AmeerHajAli

edoakes commented 2 years ago

Thanks so much for the feedback @msaroufim, this is really helpful!

On the point about working_dir size limitation, I don't think we're likely to increase this because it's not intended to be used for very large files (e.g., datasets in Kaggle). The best practice for these is to store them in cloud storage (e.g., S3, HDFS) and load them in the application.

I had a few more questions about your other points:

msaroufim commented 2 years ago

Thanks @edoakes

On the point about working_dir size limitation, I don't think we're likely to increase this because it's not intended to be used for very large files (e.g., datasets in Kaggle). The best practice for these is to store them in cloud storage (e.g., S3, HDFS) and load them in the application.

This actually sounds fine, might just need to mention this in docs or source code for working_dir

What do you mean by "a more natural way to submit distributed jobs?" I'm not sure what you mean by "distributed" here -- the job can be a normal Ray program that runs actors & tasks across the cluster, is there another type of "distributed job" you're looking for?

Added more detail here, there' scripts like in PyTorch distributed where you need to set environment variables like rank and world size that. Had to do this to make things work

igorgad commented 2 years ago

I'm particularly excited about the ray job API as an opportunity to move from clusters dedicated to a specific project to a generic cluster that could handle multiple workloads across the entire team.

Allow me to add a couple of points:

  1. It should be easier to run certain scripts on specific nodes, like a non-ray training script on a GPU node. I have been using the following workaround for this particular case but IMO it should be integrated with the API.
@ray.remote
def remote_executor(command):
    runtime_env = ray.get_runtime_context().runtime_env
    ray_address = ray._private.services.get_ray_address_to_use_or_die()
    os.environ[RAY_JOB_CONFIG_JSON_ENV_VAR] = json.dumps({"runtime_env": runtime_env})
    os.environ[RAY_ADDRESS_ENVIRONMENT_VARIABLE] = ray_address
    child_process = subprocess.Popen(command,
                                     shell=True,
                                     start_new_session=True)

    parent_pid = os.getpid()
    child_pid = child_process.pid
    child_pgid = os.getpgid(child_pid)
    subprocess.Popen(
        f"while kill -s 0 {parent_pid}; do sleep 5; done; kill -9 -{child_pgid}",  # noqa: E501
        shell=True,
        # Suppress output
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )

    while child_process is not None:
        return_code = child_process.poll()
        if return_code is not None:
            # subprocess finished with return code
            return return_code
        else:
            # still running, yield control
            time.sleep(SUBPROCESS_POLL_PERIOD_S)

executor = remote_executor.options(num_cpus=num_cpus,
                                 num_gpus=num_gpus,
                                 resources={gpu_type: num_gpus},
                                 runtime_env={'conda': conda_path},
                                 name=jobid,
                                 max_retries=0,
                                 placement_group=pg)
ray.get(executor.remote(entrypoint))
  1. Having something like ray job list outputting a table would be cool to check queued and running jobs. []s
jiaodong commented 2 years ago

@igorgad thanks for your input ! For 2) we have an issue open for this and it shouldn't be hard to add https://github.com/ray-project/ray/issues/21956

For 1) I actually think it's already supported, and by reading through the code snippet you have, it highly overlaps with what current job manager is already doing, such as using an executor actor to manage lifecycle of a job, and use pgid to implement subprocess fate sharing.

In order to achieve run certain scripts on specific nodes, like a non-ray training script on a GPU node I think you essentially need to make your entrypoint script a ray script that kicks off tasks/actors with pinned resource via .options(), and submit it with runtime_env you want to use. Under the hood the "executor actor" will run on headnode, but subsequent execution is the same on your current ray cluster and respect your script's resource constraints.

Please let me know if my understanding is correct, or if we can make any documentation improvements in https://docs.ray.io/en/master/cluster/job-submission.html

igorgad commented 2 years ago

Hey @jiaodong, thanks for your answer.

  1. Yeah, the snippet I shared was highly inspired by the JobSupervisor and that's what I'm doing at the moment for job placement. If I understood correctly from your answer, users must handle the placement of their jobs by creating a ray job to act as a proxy on the desired node. However, this approach still requires from the user some knowledge about ray core API that may limit the adoption of Ray in general. I wonder if we could launch the JobSupervisor directly on the desired node by providing the requested resources on the Job CLI/SDK/API. Perhaps here

  2. Great, do you think it should arrive on 1.10?

Thanks a lot []s

jiaodong commented 2 years ago

If I understood correctly from your answer, users must handle the placement of their jobs by creating a ray job to act as a proxy on the desired node.

Hmm not quite -- I assume the "ray job to act as proxy" is the JobSupervisor actor in job manager, it always runs on the headnode since its .options() required ray cluster resource by the name of headnode. It's a very lightweight actor that's primarily responsible for setting up runtime_env for the job and provide a way for job manager to looks into and manage the job's lifecycle.

It's important to separate the JobSupervisor actor from entrypoint script as the latter is the meat of your application that could be any arbitrary shell script, python script or python script using ray. What you put in the entrypoint script determines your task/actor placement rather than how you configure JobSupervisor.

For a concrete example, let's say I have a ray cluster running on a physical cluster with both CPU and GPU nodes, and I want my application to run on GPU nodes only.

entrypoint.py

import ray
import os

ray.init(address="auto")

@ray.remote(num_gpus=1)
def fn_use_gpu():
    print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
    print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))

In this script, our function fn_use_gpu will run on GPU only with a trivial job submission SDK/CLI/REST to the ray cluster address, not because we provided special setting in job actor like resources={gpu_type: num_gpus}, but due to fn_use_gpu knows its resource constraint, and will look for corresponding node in ray cluster where target resource is available. This logic is exactly the same as if you're running a local ray script on a server that happens to have GPUs.

jiaodong commented 2 years ago

For ray job list api sadly it won't be in 1.10 :/ the branch cut was made much earlier and in fact was just release already. It's not a very difficult PR tho and the team might just implemented it someday .. cc: @architkulkarni will have better idea about detailed plan for next steps of oss job submission.

architkulkarni commented 2 years ago

We expect to have ray job list in Ray 1.12 (mid-March release). @igorgad does Jiao's latest explanation make sense? I'm working on adding more documentation for Jobs and will probably add something related to this question, so feedback here is appreciated!

igorgad commented 2 years ago

Hi. Sorry for the later reply.

Yes it makes sense, and that's what I have been doing. Considering that all cluster managers (slurm, k8s etc) require some sort of custom script to launch jobs in specific nodes, using the entrypoint example given above is super simple.

dynamicwebpaige commented 2 years ago

Update on requirements.txt support:

https://github.com/ray-project/ray/commit/52a722ffe7d255e12cfab3352d442cbca362af44

stale[bot] commented 2 years ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

stale[bot] commented 2 years ago

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!