beacon-biosignals / Ray.jl

Julia API for Ray
Other
9 stars 1 forks source link

Parse runtime info from GlobalStateAccessor #52

Open glennmoy opened 1 year ago

glennmoy commented 11 months ago

Params we need:


Looking through the python code the ray.init() function contains a mix of behaviours depending on whether it's connecting to an existing cluster or not... we want to look at the case where it's connecting to an existing cluster...

Note: it may be sufficient to simply search for the address of an existing cluster. The cluster address is easy enough to get if not provided, it lives in a new file:

If no address is provided, try to find an existing Ray instance to connect to. This is done by first checking the environment variable RAY_ADDRESS. If this is not defined, check the address of the latest cluster started (found in /tmp/ray/ray_current_cluster) if available. If this is also empty, then start a new local Ray instance.

This branch is triggered if bootstrap_address is parsed from canonicalize_bootstrap_address

redis_address, gcs_address = None, None
    bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir)
    if bootstrap_address is not None:
        gcs_address = bootstrap_address
        logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address)

which internally calls get_ray_address_from_environment. This does a few things but the ones that interest us are:

I think it uses a combination of these to support legacy behaviour. For our purposes the address living in /tmp/ray/ray_current_cluster should do us fine.

In any case, this sets gcs_address = <address parsed from /tmp/ray/ray_current_cluster>...

I do wonder if this is now sufficient to connect to the raylet...like if we can somehow get the rest of the information from the GlobalStateAccessor. I'll spend a bit more time investigating this but worst comes to worst we should be able to get everything else from new files as well

$ cat /tmp/ray/ray_current_cluster                                                                                                                        
127.0.0.1:6379

$ cat /tmp/ray/session_latest/node_ip_address.json                                                                                                        
{"node_ip_address": "127.0.0.1"}

$ ls /tmp/ray/session_latest/sockets                                                                                                                      
plasma_store= raylet=

$ cat /tmp/ray/session_latest/ports_by_node.json                                                                                                     
{"127.0.0.1:/tmp/ray/session_2023-10-17_13-29-06_634039_96825/sockets/plasma_store": {"metrics_agent_port": 65234, "metrics_export_port": 65384, "dashboard_agent_listen_port": 52365, "runtime_env_agent_port": 64918}}

In the rest of the init function it passes the address to some RayParams which go on to configure a Node class which seems to initialize the driver process before connecting it to the raylet.

Let's see how this gets the rest of the params we want

The node_ip_address is read from wait_and_get_for_node_address which reads from a RAY_NODE_IP_FILENAME = "node_ip_address.json"

The store socket name, raylet socket name, and node manager port are returned by get_node_to_connect_for_driver which is a wrapper for a method of GlobalStateAccessor (!). This passes back a dict of the various params.

So it looks like our ingredients are:

kleinschmidt commented 11 months ago

Re-opening this because #214 was causing segfaults on cluster

glennmoy commented 11 months ago

Brief summary of the situation for posterity:

[ Info: Raylet socket: /tmp/ray/session_2023-10-26_23-54-18_508664_1/sockets/raylet, Object store: /tmp/ray/session_2023-10-26_23-54-18_508664_1/sockets/plasma_store, Node IP: 10.0.29.28, Node port: 41371, GCS Address: 10.0.29.28:6379, JobID: JobID(21)
2ERROR: LoadError: basic_string::_M_create
3Stacktrace:
4 [1] initialize_driver(arg1::CxxWrap.StdLib.StdStringDereferenced, arg2::CxxWrap.StdLib.StdStringDereferenced, arg3::String, arg4::String, arg5::Int32, arg6::Ray.ray_julia_jll.JobIDAllocated, arg7::String, arg8::CxxWrap.StdLib.StdStringAllocated)
5   @ Ray.ray_julia_jll /usr/local/share/julia-depot/4c99afcd78/packages/CxxWrap/5IZvn/src/CxxWrap.jl:624
6 [2] init(runtime_env::Ray.RuntimeEnv; session_dir::String, logs_dir::String)
7   @ Ray /usr/local/share/julia-depot/4c99afcd78/dev/MVP/dev/Ray/src/runtime.jl:126
8 [3] init
9   @ /usr/local/share/julia-depot/4c99afcd78/dev/MVP/dev/Ray/src/runtime.jl:68 [inlined]
10 [4] compute_features_ray(; debug::NamedTuple{(:flag, :n), Tuple{Bool, Int64}})
11   @ v0_4_4 /usr/local/share/julia-depot/4c99afcd78/dev/MVP/src/v0_4_4.jl:209
12 [5] top-level scope
13   @ /tmp/ray/session_2023-10-26_23-54-18_508664_1/runtime_resources/working_dir_files/_ray_pkg_4e780a0653e96063/run.jl:7
14in expression starting at /tmp/ray/session_2023-10-26_23-54-18_508664_1/runtime_resources/working_dir_files/_ray_pkg_4e780a0653e96063/run.jl:7

Local disk full error:

ERROR: LoadError: ray_julia_jll.put returned Status::Unknown: Local disk is full
Stacktrace:
 [1] error(s::String)
   @ Base ./error.jl:35
 [2] serialize_args(args::Vector{Pair{Symbol, Any}})
   @ Ray /usr/local/share/julia-depot/ab14e38af3/dev/Ray/src/runtime.jl:267
 [3] submit_task(f::Function, args::Tuple{DataFrames.DataFrame, AWSS3.S3Path{Nothing}}, kwargs::NamedTuple{(), Tuple{}}; runtime_env::Nothing, resources::Dict{String, Float64}, max_retries::Int64)
   @ Ray /usr/local/share/julia-depot/ab14e38af3/dev/Ray/src/runtime.jl:212
 [4] submit_task
   @ /usr/local/share/julia-depot/ab14e38af3/dev/Ray/src/runtime.jl:206 [inlined]
 [5] compute_spectral_features(segments::DataFrames.DataFrame; debug::NamedTuple{(:flag, :n), Tuple{Bool, Int64}})
   @ v0_4_4 /usr/local/share/julia-depot/4c99afcd78/dev/clean-sleep-migration-v0.4.4/src/v0_4_4.jl:134
 [6] macro expansion
   @ /usr/local/share/julia-depot/4c99afcd78/dev/clean-sleep-migration-v0.4.4/src/v0_4_4.jl:210 [inlined]
 [7] macro expansion
   @ ./timing.jl:273 [inlined]
 [8] compute_features_ray(; debug::NamedTuple{(:flag, :n), Tuple{Bool, Int64}})
   @ v0_4_4 /usr/local/share/julia-depot/4c99afcd78/dev/clean-sleep-migration-v0.4.4/src/v0_4_4.jl:209
 [9] top-level scope
   @ /tmp/ray/session_2023-11-01_12-12-35_455050_1/runtime_resources/working_dir_files/_ray_pkg_4e780a0653e96063/migration.jl:7
in expression starting at /tmp/ray/session_2023-11-01_12-12-35_455050_1/runtime_resources/working_dir_files/_ray_pkg_4e780a0653e96063/migration.jl:7

---------------------------------------
Job 'raysubmit_VLYhVWJZweNZ1xik' failed
---------------------------------------

Status message: Job failed due to an application error, last available logs (truncated to 20,000 chars):
   @ v0_4_4 /usr/local/share/julia-depot/4c99afcd78/dev/clean-sleep-migration-v0.4.4/src/v0_4_4.jl:134
 [6] macro expansion
   @ /usr/local/share/julia-depot/4c99afcd78/dev/clean-sleep-migration-v0.4.4/src/v0_4_4.jl:210 [inlined]
 [7] macro expansion
   @ ./timing.jl:273 [inlined]
 [8] compute_features_ray(; debug::NamedTuple{(:flag, :n), Tuple{Bool, Int64}})
   @ v0_4_4 /usr/local/share/julia-depot/4c99afcd78/dev/clean-sleep-migration-v0.4.4/src/v0_4_4.jl:209
 [9] top-level scope
   @ /tmp/ray/session_2023-11-01_12-12-35_455050_1/runtime_resources/working_dir_files/_ray_pkg_4e780a0653e96063/migration.jl:7
in expression starting at /tmp/ray/session_2023-11-01_12-12-35_455050_1/runtime_resources/working_dir_files/_ray_pkg_4e780a0653e96063/migration.jl:7

Disconnect error:

[ Info: Processing d4a3ec81-8e78-490e-993d-3b81b568c6cd_spectral.arrow...
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 665, in urlopen
    httplib_response = self._make_request(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 421, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 416, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse
    response.begin()
  File "/usr/lib/python3.8/http/client.py", line 316, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.8/http/client.py", line 285, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
http.client.RemoteDisconnected: Remote end closed connection without response

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/requests/adapters.py", line 439, in send
    resp = conn.urlopen(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 719, in urlopen
    retries = retries.increment(
  File "/usr/lib/python3/dist-packages/urllib3/util/retry.py", line 400, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/lib/python3/dist-packages/six.py", line 702, in reraise
    raise value.with_traceback(tb)
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 665, in urlopen
    httplib_response = self._make_request(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 421, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 416, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse
    response.begin()
  File "/usr/lib/python3.8/http/client.py", line 316, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.8/http/client.py", line 285, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
urllib3.exceptions.ProtocolError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/ray", line 8, in <module>
    sys.exit(main())
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/scripts/scripts.py", line 2462, in main
    return cli()
  File "/usr/lib/python3/dist-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/usr/lib/python3/dist-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/lib/python3/dist-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/lib/python3/dist-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/dashboard/modules/job/cli_utils.py", line 44, in wrapper
    return func(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/autoscaler/_private/cli_logger.py", line 856, in wrapper
    return f(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 247, in submit
    get_or_create_event_loop().run_until_complete(_tail_logs(client, job_id))
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 68, in _tail_logs
    _log_job_status(client, job_id)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 48, in _log_job_status
    info = client.get_job_info(job_id)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/dashboard/modules/job/sdk.py", line 331, in get_job_info
    r = self._do_request("GET", f"/api/jobs/{job_id}")
  File "/home/ubuntu/.local/lib/python3.8/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 303, in _do_request
    return requests.request(
  File "/usr/lib/python3/dist-packages/requests/api.py", line 60, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 535, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 648, in send
    r = adapter.send(request, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))