ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.2k stars 5.81k forks source link

[Ray component: Core] Add way to fix problems with ray::IDLE workers taking up too many resources #27499

Open joshua-cogliati-inl opened 2 years ago

joshua-cogliati-inl commented 2 years ago

Description

Ray has --num-cpus to limit the number of cpus used on a node, but this does not limit the number of ray::IDLE workers that exist. So for example after running: ray start --address=10.159.8.149:53454 --num-cpus 4 I have seen over 90 ray::IDLE workers created. Each of these workers uses cpu and memory, which results in significant resource use.

On stack overflow, use of ray.init(local_mode=True) https://stackoverflow.com/a/63231293/18954005 was suggested, but that basically removes parallelism.

An alternative work around is use of --min-worker-port and --max-worker-port to restrict the number, but if ports are already used by some other process, that can cause fewer workers to be created than desired.

Use case

I would like to be able to limit the resources used by ray::IDLE workers.

joshua-cogliati-inl commented 2 years ago

(Possibly if there is a state where ray::IDLE workers are actually waiting to pass data or something else, they should be marked ray::Almost_IDLE or something, so that the user can figure out what is happening from looking at the output of ps.)

stephanie-wang commented 2 years ago

We cannot put a hard cap on the number of workers since this can potentially deadlock if you have a deeply nested application, so I'm afraid this feature is unlikely to be supported. The recommended way to impose a soft cap is to use --num-cpus, as you said.

That said, this is definitely abnormal:

So for example after running: ray start --address=10.159.8.149:53454 --num-cpus 4 I have seen over 90 ray::IDLE workers created.

Can you please provide a reproduction script so we can dig into this further?

joshua-cogliati-inl commented 2 years ago

Hm, I don't have a simple reproduction script, but what we are roughly doing is calling ray.init in one python program, and then it runs multiple ray.remotes, and in the ray remotes, they do a subprocess call to another python program that does another ray.init and then calls its own ray.remotes. I think that is required to get to levels like 90.

That said, if you want to see ray::IDLE workers that never get cleaned up, the reproduction scripts in #28071 work. Here they are in one listing:

Create environment:

conda create --name ray2
conda activate ray2
conda install --name ray2 pip
pip install ray==2.0.*

ray_start.sh

#!/bin/bash

source /home/fred/miniconda3/etc/profile.d/conda.sh
conda activate ray2

NUM_CPUS=$1
HEAD_ADDRESS=$2

ray start --verbose --address=$HEAD_ADDRESS --num-cpus $NUM_CPUS  --min-worker-port 10002 --max-worker-port $((10002+8*$NUM_CPUS))

ssh to one of the nodes to run on, and run this python script: listen_range.py

import socket
import time

l = []
for PORT in range(10002, 10010+1):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(('', PORT))
        s.listen(1)
        l.append(s)
    except OSError:
        print("unable to get ",PORT)

if len(l) > 0:
    input("Press enter to release ports:")

Script to start ray ($PBS_NODEFILE lists the nodes to run on)

ray start --head --num-cpus=1 --port=0 --min-worker-port 10002 --max-worker-port 10010

#Adjust this based on output of ray start
export RAY_ADDRESS='10.159.5.15:65512'

for NODE in `cat $PBS_NODEFILE | uniq`; do
    if echo $NODE | grep -q `hostname`; then
        echo skipping $NODE;
    else
       echo $NODE
       ssh $NODE /home/fred/ray_start.sh 1 $RAY_ADDRESS
    fi
done

Python script to test ray:

import ray
import time
import gc
ray.init(address='auto')

start = time.time()

@ray.remote
def f(x):
    a = [2**2**2**2**2 for x in range(100000)]
    return x * x

print("starting remote functions")
futures = [f.remote(i) for i in range(80)]
print("remotes started")
left = len(futures)
outputs = [None]*left
while left > 0:
    for i in range(len(futures)):
        if futures[i] is not None:
            try:
                f = futures[i]
                runReturn = ray.get(f, timeout=1e-10)
                print(i, runReturn)
                left += -1
                outputs[i] = runReturn
                futures[i] = None
                gc.collect()
                print("referrers", len(gc.get_referrers(f)))
                del f
                gc.collect()
            except ray.exceptions.GetTimeoutError:
                pass

print(outputs) # [0, 1, 4, 9]
end = time.time()
print("time: ",end - start)

Basically, the listen_range.py on a node blocks that node from creating workers, and eventually you end up with ray::IDLE on lots of nodes, and no progress being made. I suspect that if this simplified reproduction script shows at least part of the problem that we are encountering that is causing lots of ray::IDLE's.

joshua-cogliati-inl commented 2 years ago

And if you want me to try and replicate the 90 idle rays, I can try and make a small replication script that does that. I suspect it is just the above with an outer python program that calls ray.init, and then calls subprocess on an inner that runs computational work.

joshua-cogliati-inl commented 2 years ago

We cannot put a hard cap on the number of workers since this can potentially deadlock if you have a deeply nested application, so I'm afraid this feature is unlikely to be supported.

Um, correct me if I am wrong, but doesn't this mean that ray potentially deadlocks with a deeply nested application, since any physical machine has some actual hard limit for the number of workers? It sounds like ray may need some way to error out in that case. (This may be difficult to fix...)

stephanie-wang commented 2 years ago

Hm, I don't have a simple reproduction script, but what we are roughly doing is calling ray.init in one python program, and then it runs multiple ray.remotes, and in the ray remotes, they do a subprocess call to another python program that does another ray.init and then calls its own ray.remotes. I think that is required to get to levels like 90.

Each time you call ray.init(), it starts another "driver" Ray program, like calling a main() function but for a distributed program. In Ray, we don't share workers across drivers for isolation purposes. I believe this is the most likely reason that you're seeing so many IDLE workers.

The best practice here is to try to keep the whole application under the same driver. In the ray.remote calls, is it possible to call the other tasks directly instead of through a subprocess that calls ray.init()? This should reduce the total number of workers needed to run the job.

Um, correct me if I am wrong, but doesn't this mean that ray potentially deadlocks with a deeply nested application, since any physical machine has some actual hard limit for the number of workers? It sounds like ray may need some way to error out in that case. (This may be difficult to fix...)

It wouldn't deadlock, but yes potentially Ray would crash if you had an extremely deeply nested application. You may bump into this if you have a recursive program that nests to depth ~1000 or more, which in practice we haven't seen very often. Ray's scheduler will try to avoid this situation by prioritizing finishing a nested branch instead of starting a new one.

joshua-cogliati-inl commented 2 years ago

I agree in principle that it would be nice to eliminate the subprocess call. We discuss that ~every 6 months, but it is non-trivial. And for what it is worth, the subprocesses do exit, and to get to 90, they probably have to not be cleaned up after exiting. We also set it up so that there should be approximately one task per cpu.

That said, the replication in https://github.com/ray-project/ray/issues/27499#issuecomment-1246894172 does not use a subprocess call, and has ray::IDLE's that take substantially longer than 1 minute to clean up.

stephanie-wang commented 2 years ago

@joshua-cogliati-inl are you okay closing this issue? We cannot provide the requested feature to hard-cap workers, but feel free to open or rename to something else.

joshua-cogliati-inl commented 2 years ago

Hm, would "ray workers not being cleaned up if node blocked" be an acceptable renaming? (And if there is another way to do the fault injection quickly besides min/max worker ports I would be happy to try a replication script with that instead.)

joshua-cogliati-inl commented 2 years ago

(If #28071 was renamed to "[Core] Ray may hang if workers fail to start", I would be fine with closing this one, because this one is about preventing ray from failing if the workers use too much memory.)

joshua-cogliati-inl commented 2 years ago

Okay, the actual problem (as the stack flow question with no good answers has: https://stackoverflow.com/questions/60098624/what-is-rayidle-and-why-are-some-of-the-workers-running-out-of-memory/63231293 ) is that ray can often end up in situations where ray::IDLE workers are taking up a lot of resources. I think this may require fixing various bugs ( #28071 and #28199 to name two.) and possibly improving the ability of users to diagnose why a ray::IDLE is being created. If the stackoverflow had a simple answer like: run ray status --debug-idle then I would consider this closeable. Thank you for the discussion.

joshua-cogliati-inl commented 2 years ago

Alternative reproducer to get long lived ray::IDLE's:

import ray
import time
import gc
import threading
ray.init(address='auto')

start = time.time()

class MemUser:
    def __init__(self, v):
        self.v = v
        #[0]*15*1024 takes about 10KB
        self.extra = [0]*15*1024*3000
    def __str__(self):
        return f"MemUser({self.v})"

@ray.remote
def f(x):
    a = [2**2**2**2**2 for x in range(100000)]
    return MemUser(x * x)

print("starting remote functions")
futures = [f.remote(i) for i in range(19)]
print("remotes started")
left = len(futures)
needed_to_finish = len(futures)
outputs = [None]*left

lowest_running = 0
while lowest_running  < needed_to_finish:
    lowest_running = len(futures)
    for i in range(len(futures)):
        if futures[i] is not None:
            if i < lowest_running:
                lowest_running = i
            try:
                fi = futures[i]
                runReturn = ray.get(fi, timeout=1e-10)
                print(i, runReturn, left, len(gc.get_referrers(fi)))
                left += -1
                outputs[i] = runReturn
                futures[i] = None
                del fi
            except ray.exceptions.GetTimeoutError:
                pass

print(outputs) # [0, 1, 4, 9]
print(lowest_running)
end = time.time()
print("time: ",end - start)

Basically, this returns a lot of data from the ray remote (sys.getsizeof([0]*15*1024*3000)/1024**2 ~= 351) and this results in multiple ray::IDLE's existing. Ray is started as in https://github.com/ray-project/ray/issues/27499#issuecomment-1246894172 without blocking a node with listen_range.py. Returning 350 MB of data is sufficient to cause remotes to get stuck in ray::IDLE all by itself. Note that the python code will never finish the ray remotes. (Update: 35 MB is sufficient to cause this as well.)

And ray status thinks everything is fine:

ray status
======== Autoscaler status: 2022-09-27 15:44:15.022588 ========
Node status
---------------------------------------------------------------
Healthy:
 1 node_4d1423677438562c9257ec9bd00ad1b4819320ae23bfd676e6fcd48c
 1 node_0bc4002f0b7bba2d36d62035b0a8dfadfbe476b19565c53926184ca9
 1 node_00d5c4d5a3f93c9a624a8c51a7ae927201f491edb5191c228c8b7e2f
 1 node_a8c4adcbf6e65fda3ea48caae6c45acafc85348a9d24022fb70a238b
 1 node_b4417b2f6e6e6de9e74fdcf42aeabf0b309bd25b23674e8076843d63
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/5.0 CPU
 0.00/608.158 GiB memory
 2.58/264.631 GiB object_store_memory

Demands:
 (no resource demands)
joshua-cogliati-inl commented 2 years ago

So ray status goes from:

ray status
======== Autoscaler status: 2022-09-27 16:24:23.842090 ========
Node status
---------------------------------------------------------------
Healthy:
 1 node_f2e49ad16e8598fe9daa2f9511b313a6c4d507e35fa138ed340420b7
 1 node_30b52fa048cebc49718cd03e454abad955eaa5ec33761cdbb552e3f8
 1 node_45fb3da1e646453da25f31f4684eb3f81ddbae9921fb820e5487ae01
 1 node_92df62425b2613090aa812196e78e975014b5b0f90505bd8e2d08807
 1 node_1bcc19dccb67ad299c7ba593efb8323abff38f92f5909b56e1ec65ac
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 5.0/5.0 CPU
 0.00/605.231 GiB memory
 0.00/263.376 GiB object_store_memory

Demands:
 {'CPU': 1.0}: 9+ pending tasks/actors

to

ray status
======== Autoscaler status: 2022-09-27 16:29:19.319481 ========
Node status
---------------------------------------------------------------
Healthy:
 1 node_f2e49ad16e8598fe9daa2f9511b313a6c4d507e35fa138ed340420b7
 1 node_30b52fa048cebc49718cd03e454abad955eaa5ec33761cdbb552e3f8
 1 node_45fb3da1e646453da25f31f4684eb3f81ddbae9921fb820e5487ae01
 1 node_92df62425b2613090aa812196e78e975014b5b0f90505bd8e2d08807
 1 node_1bcc19dccb67ad299c7ba593efb8323abff38f92f5909b56e1ec65ac
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/5.0 CPU
 0.00/605.231 GiB memory
 0.13/263.376 GiB object_store_memory

Demands:
 (no resource demands)

without the ray.get ever either returning a value or throwing an exception or outputting anything related to this to *.err in the logs directories.

joshua-cogliati-inl commented 2 years ago

@stephanie-wang Is this worth a separate issue?

joshua-cogliati-inl commented 2 years ago

Neat, we got a stack trace with 35 MB return values after waiting long enough (note that it went something like 15 minutes before the "failed to connect to GCS within 60 seconds" happened):

[2022-09-27 16:49:33,246 C 261192 261210] gcs_rpc_client.h:537:  Check failed: absl::ToInt64Seconds(absl::Now() - gcs_last_alive_time_) < ::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s() Failed to connect to GCS within 60 seconds
*** StackTrace Information ***
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(+0xc3f93a) [0x2aaab304c93a] ray::operator<<()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(+0xc41442) [0x2aaab304e442] ray::SpdLogMessage::Flush()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x37) [0x2aaab304e757] ray::RayLog::~RayLog()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(+0x6f4a5d) [0x2aaab2b01a5d] ray::rpc::GcsRpcClient::CheckChannelStatus()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(_ZN5boost4asio6detail12wait_handlerIZN3ray3rpc12GcsRpcClient15SetupCheckTimerEvEUlNS_6system10error_codeEE_NS0_9execution12any_executorIJNS9_12context_as_tIRNS0_17execution_contextEEENS9_6detail8blocking7never_tILi0EEENS9_11prefer_onlyINSG_10possibly_tILi0EEEEENSJ_INSF_16outstanding_work9tracked_tILi0EEEEENSJ_INSN_11untracked_tILi0EEEEENSJ_INSF_12relationship6fork_tILi0EEEEENSJ_INSU_14continuation_tILi0EEEEEEEEE11do_completeEPvPNS1_19scheduler_operationERKS7_m+0x303) [0x2aaab2b01f03] boost::asio::detail::wait_handler<>::do_complete()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(+0xc5037b) [0x2aaab305d37b] boost::asio::detail::scheduler::do_run_one()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(+0xc515b1) [0x2aaab305e5b1] boost::asio::detail::scheduler::run()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(+0xc51820) [0x2aaab305e820] boost::asio::io_context::run()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0xcd) [0x2aaab29ef66d] ray::core::CoreWorker::RunIOService()
/home/fred/miniconda3/envs/ray2/lib/python3.10/site-packages/ray/_raylet.so(+0xd7d4d0) [0x2aaab318a4d0] execute_native_thread_routine
/lib64/libpthread.so.0(+0x7ea5) [0x2aaaaacd6ea5] start_thread
/lib64/libc.so.6(clone+0x6d) [0x2aaaab8fab0d] clone
joshua-cogliati-inl commented 2 years ago

Added new issue at: https://github.com/ray-project/ray/issues/28855

joshua-cogliati-inl commented 1 year ago

For what it is worth, I created a test where I start ray with process A, and then start handing process B, C, ... the address of the ray server. B and C and ... run some ray process and then B finishes, C finishes, ... and still I have tons of ray::IDLE process from B, C, ... so I think I have ray::IDLE's from processes that finished an hour ago, but still haven't been collected.

tongyifan commented 1 year ago

For what it is worth, I created a test where I start ray with process A, and then start handing process B, C, ... the address of the ray server. B and C and ... run some ray process and then B finishes, C finishes, ... and still I have tons of ray::IDLE process from B, C, ... so I think I have ray::IDLE's from processes that finished an hour ago, but still haven't been collected.

+1 so I still use ray 1.11.1. after testing it's the latest version that I can't reproduce this question.

joshua-cogliati-inl commented 1 year ago

@stephanie-wang In reply to the stack overflow comment, is there any way I can debug ray::IDLE's that are continuing after the process exits? (Is there something I should call before exiting a process to tell ray to drop any continuing ray::IDLE's? Is there a way to tell a ray remote I am done with it (besides deleting it?)? I don't have a small test of this, but this is the code that I am referring to in https://github.com/ray-project/ray/issues/27499#issuecomment-1380935436 https://github.com/idaholab/raven/blob/849f70f6ffde7c8f88a251788f4a61d4483f867f/ravenframework/Runners/DistributedMemoryRunner.py

damonwan1 commented 1 year ago

I have the same question. The Ray State is Idle, however, the CPU is 100% full and the memory is increasing gradually. I am curious about what happens in the Ray. At the same moment, all the Ray subProcess is finished, I have printed the log. image

Then the OOM happened. image

stale[bot] commented 1 year ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

joshua-cogliati-inl commented 1 year ago

For what it is worth, the only way I have found to decrease the amount of resources that ray::IDLE workers use was to switch to dask :(

rkooo567 commented 1 year ago

note: this is the repro script; https://github.com/ray-project/ray/issues/27499#issuecomment-1260079287

rkooo567 commented 1 year ago

We fixed various issues relevant to process leak. We will try repro with ^ script, and close the issue if it is not happening again.

anyscalesam commented 1 year ago

@scv119 have you had a chance to take a look at this yet?

LilDojd commented 1 year ago

I am having this issue also, and it is blocking. Calling ray.init() within a python script with a class that builds up on DAG API.

Versions tested

Ray is initialized within this method, which is called only once.

def _check_init_ray(self):
    if not ray.is_initialized():
        _context = ray.init(
            ignore_reinit_error=True,
            num_cpus=self._cfg.num_cpus,
            num_gpus=self._cfg.num_gpus,
            storage=str(self._cfg.ray_storage),
        )

The class itself does not store any references to Ray objects internally. DAG is built by iterating over topologically sorted nodes and creating FunctionNodes for them. Dendncies are inferred internally and supplied in **inputs as top-level arguments:

def _schedule_dag(self, protocoldag: ProtocolDAG) -> 'FunctionNode':

  ...

  for unit in protocoldag.protocol_units:

      requested_resources = self.resolve_resources(unit)

      def store_internal_result(result: Union[ProtocolUnit, ProtocolUnitResult]):
          if isinstance(result, ProtocolUnit):
              return self.nodes[pdag_key][result.key]
          self._internal_client.store_result(result)
          return self._internal_storage.get_object_ref(
              f"results/{'/'.join(result.key.split('-'))}.json"
          )

      inputs = unit.inputs
      inputs = modify_dependencies(
          inputs, store_internal_result, is_my_obj, mode="encode"
      )  # This modifies inputs in place

      inputs = flatten_dict(inputs)

      node = execute_unit_remote.options(
          name=unit.name, **requested_resources
      ).bind(unit_rbfengine_key, self.client, **inputs)

     self.nodes[pdag_key][unit.key] = node
  ... 
     # After building dag and getting terminal node, all internal references to Ray objects are removed:
     for prefix, location in self._internal_client.list_objects():
         self._internal_client.delete(
             f"{prefix}/{'/'.join(location.split('-'))}.json"
         )

     del self.nodes[pdag_key]

     # Last node
     return node

The execute_unit_remote() itself is a simple ray remote function:

@ray.remote
def execute_unit_remote(
    unit_key: str,
    external_client: ResultClient,
    **dependencies,
) -> ProtocolUnitResult:

    # Get unit from local client
    unit = external_client.load_tokenizable(unit_key)

    dependencies = unflatten_dict(dependencies)

    result = unit.execute(
        **dependencies, raise_error=True, shared=external_client.get_store()
    )

    return _from_dict(result)

Then I just call node.execute() or workflow.run_async(node) for multiple independent DAGs on a single node.

Symptoms

image

Also, this is how memory usage by component looks like (purple is ray::IDLE:

image

Questions

I will be opening a separate issue if I do not find a workaround. Thank you!

rkooo567 commented 1 year ago

@LilDojd thanks for the detailed report!

Is this an expected behavior when multiple large DAGs run from the same entry point?

IDLE procesess are supposed to exist while you are running your script, but not after you terminate your job.

Is it safe to detect such unaccounted ray::IDLE processes and call SIGKILL on them?

If your job is already finished & you don't use detached actors, it is mostly okay. I think we actually already do this. It is risky to do it while you are running your script because the idle processes can still own some important metadata

I have a couple more questions here;

  1. When there are lots of idle processes, was your python script already finished, or is it still running?
  2. How many IDLE processes are there when you observe this?
LilDojd commented 1 year ago

@rkooo567

  1. Still running. The memory is released only when the script ends or I kill idle workers. I haven't yet tested if IDLE workers persist after script entrypouint ends when head-node is spun up explicitly with ray start.
  2. Definetly more then number of CPUs assigned to head-node and even available on the server. I'd say 50-ish
rkooo567 commented 1 year ago

Hmm actually having idle processes alive while running a script is not an unexpected behavior (it is intended actually). I think the main question here is

  1. how many processes do you exactly have? Also note that ray can start a new worker if you call ray.get inside a task.
  2. How much memory each IDLE process uses? what's the value of RES - shm (it'd be great if you can show me a screenshot)
anyscalesam commented 1 year ago

@LilDojd can you advise on @rkooo567 questions?