beacon-biosignals / Ray.jl

Julia API for Ray
Other
9 stars 1 forks source link

Support passing object references between workers #77

Closed omus closed 1 year ago

omus commented 1 year ago

At the moment when an object reference is created it must be used on the worker where it was created. For example:

julia> using Ray

julia> Ray.init()

julia> x = Ray.put(1)
ray_core_worker_julia_jll.ObjectIDAllocated(Ptr{Nothing} @0x0000600003a15140)

julia> y = Ray.submit_task(identity, (x,))
ray_core_worker_julia_jll.ObjectIDAllocated(Ptr{Nothing} @0x000060000392f990)

julia> Ray.get(y)
ray_core_worker_julia_jll.ObjectIDAllocated(Ptr{Nothing} @0x0000000000000000)

julia> Ray.get(Ray.get(y))
ERROR: C++ object of type N3ray8ObjectIDE was deleted
Stacktrace:
 [1] get(arg1::Union{ConstCxxRef{ray_core_worker_julia_jll.ObjectID}, CxxRef{ray_core_worker_julia_jll.ObjectID}, ray_core_worker_julia_jll.ObjectID})
   @ ray_core_worker_julia_jll ~/.julia/packages/CxxWrap/aXNBY/src/CxxWrap.jl:624
 [2] get(oid::ray_core_worker_julia_jll.ObjectIDAllocated)
   @ Ray ~/.julia/dev/ray_core_worker_julia_jll/Ray.jl/src/object_store.jl:23
 [3] top-level scope
   @ REPL[5]:1

There are two issues going on here:

  1. Julia serializes Ptrs as C_NULL
  2. Ray needs to be informed about object references contained within serialized data (see Ray's prepare_args_internal Cython code)

Implementing 1 without 2 results in:

julia> Ray.get(obj_ref)
[2023-08-29 14:14:35,830 C 28116 3133759] wrapper.cc:207:  Check failed: _s.ok() Bad status: ObjectUnknownOwner: An application is trying to access Ray objects whose owner is unknown(00ef45ccd0112571ffffffffffffffffffffffff0100000002000000 ). Please make sure that all Ray objects you are trying to access are part of the current Ray session. Note that object IDs generated randomly (ObjectID.from_random()) or out-of-band (ObjectID.from_binary(...)) cannot be passed as a task argument because Ray does not know which task created them. If this was not how your object ID was generated, please file an issue at https://github.com/ray-project/ray/issues/
*** StackTrace Information ***
0   julia_core_worker_lib.so            0x00000001348b57e0 _ZN3raylsERNSt3__113basic_ostreamIcNS0_11char_traitsIcEEEERKNS_10StackTraceE + 84 ray::operator<<()
1   julia_core_worker_lib.so            0x00000001348e2d30 _ZN3ray13SpdLogMessage5FlushEv + 220 ray::SpdLogMessage::Flush()
2   julia_core_worker_lib.so            0x00000001348e2bb0 _ZN3ray13SpdLogMessageD2Ev + 24 ray::SpdLogMessage::~SpdLogMessage()
3   julia_core_worker_lib.so            0x00000001348b8874 _ZN3ray6RayLogD2Ev + 52 ray::RayLog::~RayLog()
4   julia_core_worker_lib.so            0x0000000134006fb0 _Z3getN3ray8ObjectIDE + 620 get()
5   julia_core_worker_lib.so            0x0000000134066874 _ZNSt3__110__function6__funcIPFNS_10shared_ptrIN3ray6BufferEEENS3_8ObjectIDEENS_9allocatorIS8_EES7_EclEOS6_ + 56 std::__1::__function::__func<>::operator()()
6   julia_core_worker_lib.so            0x00000001340665bc _ZN5jlcxx6detail11CallFunctorINSt3__110shared_ptrIN3ray6BufferEEEJNS4_8ObjectIDEEE5applyEPKvNS_13WrappedCppPtrE + 92 jlcxx::detail::CallFunctor<>::apply()
...

The plan to tackle this issue is to:

omus commented 1 year ago

Breadcrumbs for potential debugging sessions:

omus commented 1 year ago

I've managed to get the example above working by capturing nested object references used within serialized data. I came up with a new test which seems to require additional changes:

# use object ref arguments in task
obj_ref = Ray.put(1)
result = Ray.submit_task(Ray.get, (obj_ref,))
@test result isa ObjectRef
@test result != obj_ref
@test Ray.get(result) == 1

In this example when the task attempts to retrieve the passed in object ref the Ray.get ends up waiting indefinitely. I suspected that the problem was that the serialized ObjectRef only contained the object ID information and that lacked the address information and using Ray.get from within a task was failing as it assumed the ID referred to a value created from the local task. That guess appears to be incorrect as the bytes of a object ID should contain that information. I've additionally checked the Python code which just uses the object IDs for ray.get internally.

Note: for those familiar with Python Ray that system automatically performs ray.get on all passed in arguments and calling ray.get on a non-ObjectRef will fail so this test is invalid there. You can perform a similar test by wrapping a ObjectRef in another ObjectRef.

omus commented 1 year ago

Solving this issue has turned out to be more complicated than originally outlined. I've been experimenting with the core workers reference counting system and other areas which look related. During my spelunking I found this tidbit:

  /// Get the RPC address of the worker that owns the given object.
  ///
  /// \param[in] object_id The object ID. The object must either be owned by
  /// us, or the caller previously added the ownership information (via
  /// RegisterOwnershipInfoAndResolveFuture).
  /// \param[out] The RPC address of the worker that owns this object.
  Status GetOwnerAddress(const ObjectID &object_id, rpc::Address *owner_address) const;

https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/core_worker/core_worker.h#L464-L466

In particular note the reference to RegisterOwnershipInfoAndResolveFuture which has this docstring:

  /// Add a reference to an ObjectID that was deserialized by the language
  /// frontend. This will also start the process to resolve the future.
  /// Specifically, we will periodically contact the owner, until we learn that
  /// the object has been created or the owner is no longer reachable. This
  /// will then unblock any Gets or submissions of tasks dependent on the
  /// object.

This is exactly what we want to do. Looking at the Python Ray client for an example we find the function deserialize_and_register_object_ref and serialize_object_ref which include the worker address of the ObjectID as part of the serialized ObjectRef.

While experimenting with this I've realized that we predict whether we'll see failures when performing Ray.get if we first check if the ObjectID has an owner associated with it (added that in #102). Having this is check is also need for registering ownership as the core worker backend doesn't like it if you register an already registered object.

I'm currently dealing with this problem:

[2023-09-08 14:27:01,854 E 75997 10878494] core_worker.cc:573: :info_message: Attempting to recover 1 lost objects by resubmitting their tasks. To disable object reconstruction, set @ray.remote(max_retries=0).

Which can be reproduced with:

using Ray
Ray.init()
local_ref = Ray.put(3)
return_ref = Ray.submit_task(Ray.get, (local_ref,))
Ray.get(return_ref)
kleinschmidt commented 1 year ago

Doing a bit more digging starting from RegisterOwnershipAndResolveFuture: this only seems to get called in the python serialization handlers: https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/python/ray/_private/serialization.py#L71-L97

omus commented 1 year ago

I've been working through the last error above and believe it may just be because we aren't specifying the outer object ID correctly which is wanted by RegisterOwnershipAndResolveFuture. I implemented a quick POC of this and it does work. However, attempting to get the object ID some time later causes it to fail:

julia> using Ray

julia> Ray.init()
[ Info: Raylet socket: /tmp/ray/session_2023-09-08_16-18-03_549076_80116/sockets/raylet, Object store: /tmp/ray/session_2023-09-08_16-18-03_549076_80116/sockets/plasma_store, Node IP: 127.0.0.1, Node port: 50964, GCS Address: 127.0.0.1:6379
[ Info: Connecting function manager to GCS at 127.0.0.1:6379...

julia> local_ref = Ray.put(3)
ObjectRef("00ffffffffffffffffffffffffffffffffffffff0100000001000000")

julia> return_ref = Ray.submit_task(Ray.get, (local_ref,))
ObjectRef("c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000")

julia> Ray.get(return_ref)
3

julia> sleep(10)

julia> Ray.get(return_ref)
[2023-09-08 16:18:18,023 C 80124 10958167] wrapper.cc:204:  Check failed: _s.ok() Bad status: IOError: Broken pipe
*** StackTrace Information ***
0   julia_core_worker_lib.so            0x000000013a357730 _ZN3raylsERNSt3__113basic_ostreamIcNS0_11char_traitsIcEEEERKNS_10StackTraceE + 84 ray::operator<<()
1   julia_core_worker_lib.so            0x000000013a384c80 _ZN3ray13SpdLogMessage5FlushEv + 220 ray::SpdLogMessage::Flush()
2   julia_core_worker_lib.so            0x000000013a384b00 _ZN3ray13SpdLogMessageD2Ev + 24 ray::SpdLogMessage::~SpdLogMessage()
3   julia_core_worker_lib.so            0x000000013a35a7c4 _ZN3ray6RayLogD2Ev + 52 ray::RayLog::~RayLog()
4   julia_core_worker_lib.so            0x0000000139a7401c _Z3getN3ray8ObjectIDEx + 664 get()
5   julia_core_worker_lib.so            0x0000000139b5f314 _ZNSt3__110__function6__funcIPFNS_10shared_ptrIN3ray9RayObjectEEENS3_8ObjectIDExENS_9allocatorIS8_EES7_EclEOS6_Ox + 60 std::__1::__function::__func<>::operator()()
6   julia_core_worker_lib.so            0x0000000139b5f1bc _ZN5jlcxx6detail11CallFunctorINSt3__110shared_ptrIN3ray9RayObjectEEEJNS4_8ObjectIDExEE5applyEPKvNS_13WrappedCppPtrEx + 104 jlcxx::detail::CallFunctor<>::apply()
7   ???                                 0x0000000127044524 0x0 + 4949558564 0x0
8   libjulia-internal.1.9.dylib         0x0000000100fea68c do_call + 188 do_call
9   libjulia-internal.1.9.dylib         0x0000000100fe8eb0 eval_body + 1476 eval_body
10  libjulia-internal.1.9.dylib         0x0000000100fe94fc jl_interpret_toplevel_thunk + 260 jl_interpret_toplevel_thunk
11  libjulia-internal.1.9.dylib         0x00000001010006e4 jl_toplevel_eval_flex + 4620 jl_toplevel_eval_flex
12  libjulia-internal.1.9.dylib         0x0000000101000608 jl_toplevel_eval_flex + 4400 jl_toplevel_eval_flex
13  libjulia-internal.1.9.dylib         0x000000010100142c ijl_toplevel_eval_in + 156 ijl_toplevel_eval_in
14  sys.dylib                           0x00000001171fa890 japi1_eval_user_input_59917 + 940 japi1_eval_user_input_59917
15  sys.dylib                           0x00000001172cd948 julia_YY.run_replYY.59_59855 + 1460 julia_YY.run_replYY.59_59855
16  sys.dylib                           0x0000000117287854 julia_YY.1017_44893 + 1308 julia_YY.1017_44893
17  libjulia-internal.1.9.dylib         0x0000000100fde2f0 jl_f__call_latest + 76 jl_f__call_latest
18  sys.dylib                           0x0000000116c042b4 julia_run_main_repl_48075 + 1272 julia_run_main_repl_48075
19  libjulia-internal.1.9.dylib         0x00000001010286d8 true_main + 192 true_main
20  libjulia-internal.1.9.dylib         0x00000001010285cc jl_repl_entrypoint + 180 jl_repl_entrypoint
21  julia                               0x0000000100a53f6c main + 12 main
22  dyld                                0x00000001a8813f28 start + 2236 start

I do believe I observed an instance where the returned value didn't match so maybe we aren't out of the woods.

Update: Something else interesting about this example is that running Ray.get(local_ref) after the object is lost results in the Ray.get timing out (if enabled). This is especially odd as Ray.put objects are pinned and we know that the driver process is still running.

Update 2: The POC mentioned followed how Python did things by performing the registration during deserialization. In Julia when we do this the Address we create only exists during the deserialize method so I believe what is going on is that the Julia GC triggers the finalizer on the Address which is referenced by the core worker code. The alternative way of handing outer object refs by storing that information in the ObjectRef structure and registering outside of deserialization should work around this.

omus commented 1 year ago

Using RegisterOwnershipAndResolveFuture has proven to be trickier than expected. While working on #108 we've seen issues with deserialization and corrupt object references which seem to be related to the "ResolveFuture" portion of the code.

My best guess is that the there is some asynchronous C++ code that isn't working well with Julia. Next steps for looking into this issue would be to try to write a custom "RegisterOwnership" function which attempts to skip the resolving future portion of the code. Alternatively, we can try tracing the code path that happens in this function to see what's going wrong.

If someone else picks this task I would recommend reviewing the Python code that handles the ownership registration. One thing to call out is that Python performs the registration as part of deserialization where as I opted to separate these as registration requires the "outer_object_id" which is cleaner to do within Ray.get. I did attempt to implement the Python style approach which can be seen in: https://github.com/beacon-biosignals/Ray.jl/pull/108/commits/dc92d45aa203016b14c89d1e359251899e035231

kleinschmidt commented 1 year ago

Another breadcrumb here comes from how the return ObjectRefs are initialized at task creation time:

https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/src/ray/core_worker/task_manager.cc#L185-L193

  for (size_t i = 0; i < num_returns; i++) {
    auto return_id = spec.ReturnId(i);
    if (!spec.IsActorCreationTask()) {
      bool is_reconstructable = max_retries != 0;
      // We pass an empty vector for inner IDs because we do not know the return
      // value of the task yet. If the task returns an ID(s), the worker will
      // publish the WaitForRefRemoved message that we are now a borrower for
      // the inner IDs. Note that this message can be received *before* the
      // PushTaskReply.
      // NOTE(swang): We increment the local ref count to ensure that the
      // object is considered in scope before we return the ObjectRef to the
      // language frontend. Note that the language bindings should set
      // skip_adding_local_ref=True to avoid double referencing the object.
      reference_counter_->AddOwnedObject(return_id,
                                         /*inner_ids=*/{},
                                         caller_address,
                                         call_site,
                                         -1,
                                         is_reconstructable,
                                         /*add_local_ref=*/true);
    }

    return_ids.push_back(return_id);
    rpc::ObjectReference ref;
    ref.set_object_id(spec.ReturnId(i).Binary());
    ref.mutable_owner_address()->CopyFrom(caller_address);
    ref.set_call_site(call_site);
    returned_refs.push_back(std::move(ref));
  }

I'm not sure whether we actually need to handle this message on thelanguage frontend, or whether it's handled somehow by the coreworker, but it does point to a possible cause for why our case of "return an object ref created on the worker" is behaving badly.

kleinschmidt commented 1 year ago

more spelunking, will keep notes here:

Object lifecycle notes

Java

misc links/notes

Tests for reference counting: https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java

Creating an ObjectRef from an existing ID calls addLocalReference: https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java#L61 (but there's an arg to disable taht, presumably so that when you initialize a new object ref after put you don't double-count it)

There's some additional bookkeeping that the object reference does, so that when an object is finalized and there aren't any other local references hanging around it can be registered as released with the rayler. This is managed by an ObjectId => WeakReference<ObjectRefImpl> map. I don't think we need to mess with this and should just call the coreworker to increment local ref count on construction and decrement it on finalization.

what happens when a task is launched (driver/launcher)?

Tasks are launched with a .remote(args...) syntax like python. I think the @ray.remote uh decorator(?) creates a TaskCaller around the wrapped method/function/whatchamacallit.

Here's where .remote is defined (I think) for java remote functions: https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/api/src/main/java/io/ray/api/call/TaskCaller.java#L28

callNormalFunction does the arg wrapping https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java#L335 calling ArgumentsBuilter.wrap(args, language)

ArgumentBuilder.wrap does the by-value/by-reference/serialization juggling https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java#L28

Next up, callNormalFunction prepares and populates the return arguments. This has two stages:

I think the reason for separating these stages is because teh Java runtime is keeping track of object IDs that it's managing as part of manual memory management

What happens when a task is executed (worker)?

raynativeruntime, CC file with task exeuctor:https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc#L110

that in turn calls execute method on the TaskExecutor class? https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java#L92

arguments are unwrapped: https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java#L73-L86

results are serialized with ObjectSerializer.serialize https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java#L149

What happens when you call get?

https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java#L130

kleinschmidt commented 1 year ago

Okay, a few conclusions at this point:

First, one big thing I'm noticing from reading the core worker header comments (the closest thing we have to dev docs here) and the java code is that we're meant to be including the owner address and the "serialized object status" when we serialize the object ref and include that in the payload. I dunno if that's what your prototype was doing @omus but it seems like a promising place to start.

Second, I think we need to "register" local refs on construction of our ObjectRef wrapper, and remove them in the finalizer. We'll need to add a flag to the constructor that allows us to skip that check in some (internal) situations, like when we create an object ref during put/task creation (since I think those are initialized with a ref count of 1 already). https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/src/ray/core_worker/core_worker.h#L429-L451

omus commented 1 year ago

First, one big thing I'm noticing from reading the core worker header comments (the closest thing we have to dev docs here) and the java code is that we're meant to be including the owner address and the "serialized object status" when we serialize the object ref and include that in the payload. I dunno if that's what your prototype was doing @omus but it seems like a promising place to start.

The prototype did this and it's currently in place: https://github.com/beacon-biosignals/Ray.jl/blob/main/src/object_ref.jl#L72-L75

We may want to verify the content though.

kleinschmidt commented 1 year ago

this is done via #126 #138 #140