ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.05k stars 5.59k forks source link

[Feature] Rust API for Ray #20609

Closed jon-chuang closed 1 year ago

jon-chuang commented 2 years ago

Search before asking

Description

Problem Description

Introduction

Ray currently allows for a very attractive distributed programming model that can be embedded within existing programming languages, offering low latency and fine-grained control of tasks and shared objects.

There are even optimized communication models, such as NCCL's allreduce, although these are pretty task-specific.

The most prominent example of this embedding is Python, the dominant general purpose language for ML/data science. There are also embeddings for Java and C++ which are commonly used in enterprise systems.

A natural next step, the subject of this proposal, Rust, is often touted as the successor to C++ in modern systems. Its popularity is due to its memory and thread safety, user-friendly, zero-cost functional programming idioms, ergonomic packaging and build system, while retaining C++/C-like performance, avoiding GC unpredictability, and having small memory footprint.

There are many new projects in the data/distributed compute industry building on Rust, including InfluxDB-IOx (time series DB), Materialize (streaming SQL view maintenance) built on Timely/Differential Dataflow, Datafusion/Ballista/Arrow-rs (SQL query engine), Databend.rs (realtime analytics), Fluvio (distributed streaming), which can all run distributed over a cluster, constellation-rs/the Hadean SDK (distributed compute platform), polar-rs (dataframe lib), delta-rs (Apache Delta table interface).

We expect that the number of such systems to grow going forward, possibly including next-gen distributed simulation/real-time engines (e.g. games, self-driving, traffic, large scale physics), distributed computing (graphs), databases and query engines, and other forms of distributed execution.

Exposing a Rust API would allow the growing Rust community to leverage Ray's programming model and possibly drive improvements in the underlying Ray system.

Considerations

The Rust community may not like the thought of using a C++ library (being memory and thread unsafe) under the hood as opposed to a pure Rust library. But as these things go, the benefits may outweigh the reservations.

Alternative libraries for distributed computation also exist in Rust, such as timely-dataflow and constellation-rs. The former is dataflow-based with automatic pipelined communication focusing on data-parallel workloads, and the latter is process-based with explicit communication and (I believe) no built-in fault tolerance, with a spinoff library amadeus doing map reduce style data-parallel stream computation.

However, just like is the case for many of Ray's workloads, this style of distributed computation may not be suitable to the types of tasks being run, which may demand more fine-grained control, while programming with explicit communication may have high cognitive overhead.

Requirements of a Worker

A worker must be able to:

  1. Talk to the runtime (raylet - Plasma object store and scheduler). This is largely handled by core_worker.
  2. Expose an API to the user embedded in the language (get, put, wait, remote) accepting that language's native types (or objects), e.g. via generics.
  3. Provide the appropriate object reference semantics within the embedding language to allow for zero-copy reads.

Objective

The Current Structure of the C++ API

The C++ API exposes a minimal runtime interface (native and local mode):

Here is the main runtime API.

Local mode is running on a single node, in a single process and without RPC, mainly for testing. We will begin with developing the local mode API for approach validation and fast iteration.

It also exposes the include files which go beyond the basic Ray API, including:

Finally, the C++ API has the following utils:

Approach

The approach is to use either the autocxx crate or the cxx crate directly to generate a set of workable bindings, either directly to the C++ API if this is feasible, the core_worker directly, or a hybrid of both if deemed necessary, whichever is the happier path. Tests will be created on the Rust side to test out all of the functionality, including more expensive cluster mode integration tests.

Using these tests or otherwise, we will try to find and fix last mile issues, such as functionality that may not play well across language boundaries (e.g. reference counting).

We will use Rust's procedural macros to instantiate tasks and actors which can provide a similarly pleasant API to Python's decorators. We may, in addition, provide idiomatic instantiations adding options as mutating methods to tasks, as those seen in the C++/Java APIs.

Roadmap

Test Cases

As a test case, I'd like to try implementing one option for distributed job scheduling for the Ballista distributed SQL engine (which differs from Spark SQL in having a native runtime with a smaller memory footprint). The current state of the job scheduling there is rather primitive. Possibly, Ray could help with query execution that exploits data locality rather than building such scheduling logic from scratch.

As a second test case, I would like to try to implement timely dataflow on top of ray. Perhaps this could allow for streaming SQL queries on top of Ballista/DataFusion. Although I worry about memory usage.

Future Directions

Cross-language:

Async actors

Multi-threaded actors/tasks

User-specified compression scheme

Direct buffer writing

Use case

No response

Related issues

No response

Are you willing to submit a PR?

mwtian commented 2 years ago

Hi @jon-chuang, great to see this effort! I think the community will be interested in the Ray Rust API proposal, and pros and cons for building on top of Ray C++ API vs core worker API.

Implementing Ray Java and C++ API have been large undertakings, so please don't shy away from reaching out to the Ray team. Ray team are active in the project slack channel to discuss Ray internals, if you have not joined already!

raulchen commented 2 years ago

Glad to see this proposal! I'm a newbie to Rust. But based on my experiences from Java and C++ API, here are some thoughts that might be useful.

  1. Local mode vs cluster mode: local mode in other languages are still incomplete. That is partially because some APIs are semantically incompatible with local mode (kill actor for example), partially because of the extra work. So in our practice, users don't usually use local mode. In addition, setting up single-node Ray cluster is much easier now than before. So I'd say the necessity of local mode is not that big. You may want to consider going with the cluster mode directly.
  2. Built on top of the core worker vs the C++ runtime. Supposedly, a new language binding should be built on top of the core worker. And the C++ runtime should only contain C++-specific code. I'm not sure how much of the C++ runtime can be re-used for Rust, except for the process helper. It's okay for the quick prototype. But if we find something in the C++ runtime can be re-used for other languages, we'd better eventually move them to the core worker.
rkooo567 commented 2 years ago

Btw, we started a thread here; https://ray-distributed.slack.com/archives/C01GJCCA2NT/p1637559584012400

Also, feel free to ping Mingwei Tian from the public slack channel! We'd love to help making the integration successful :)!

jon-chuang commented 2 years ago

Rust API for Ray: Detailed Report and Plan

Hi everyone, thank you for the enthusiasm and support!

I apologize for not replying sooner, I've been busy figuring out more details of the problem.

There are indeed many things to discuss.


General Issues of Concern

  1. Building

    • tl;dr: cxx allows one to construct a C++/Rust interop bridge, defining interface to classes and methods with safe wrapper types. However, it is not magic and the process is still somewhat manual, similar to JNI or pxd.
    • I managed to build a small bridge from Ray to Rust via cxx and Bazel.

I investigated several ways of building:

  1. Building by manually including include dirs into a Cargo-driven build process via the cc crate

    • Problem: cannot leverage existing Bazel build rules, which include remote dependency resolution via http_archive. Even if could be made successful, manual build process is fragile.
  2. Using rust-bindgen/autocxx

    • Rust's bindgen is a way of generating C-style raw pointer bindings based on a traversal of header files. However, when invoking with Bazel, this approach fails on external dependencies like msgpack and absl, very likely due to the fact that these libraries have their own build rules which are opaque to bindgen.
      • autocxx is like bindgen but with safe wrapper types (see cxx). Since autocxx relies internally on a fork of bindgen, I decided not to investigate further since it would likely run into similar issues.
  3. "just works": cxx

    • cxx allows one to construct a C++/Rust interop bridge, defining interface to classes and methods with safe wrapper types. However, it is not magic and the process is still somewhat manual, similar to JNI or pxd.
      • I managed to build a small bridge from Ray to Rust via cxx and Bazel.
        • However, actual interop functionality has not been tested yet.
      • The priority is to get to the point where some basic sanity checks can be performed, i.e. the worker can talk to the raylet.
        • Inspiration should be taken from the C++ API tests as for what would be minimal testing milestones.
  4. Packaging

    • For packaging and publishing on crates.io, I am of the mind to invoke bazel build under the hood in the cargo build.rs file, which would error if the user has not installed Bazel on their system. The build.rs script will then have to link to Cargo the shared object files produced.
- I hope this will work out, because we are taking a slightly different pathway to `tensorflow-sys`, which uses an existing tool `bindgen` to generate code for cargo to compile after generating some shared object files with Bazel. - Action points: figure out how to link Bazel project into Rust project.
  1. Bindings to CoreWorker: we will indeed make bindings to CoreWorker and related types rather than to the C++ API directly.
- It is very unlikely that direct bindings to the C++ API can be generated. For one thing, APIs like `Put`, `Get`, `Task` and `Actor` are templated, and templates/generics rarely play well across the C++/Rust boundary. Thus, one would likely require a more low-level API to embed Rust concepts into. Thus, one should indeed attempt to wrap the `CoreWorker` and some associated types. - In terms of the `CoreWorker` API: - The `Put` API offered by `CoreWorker` accepts a `RayObject` - This is just a wrapper around the `Buffer` and some metadata. - Thus, one should can serialize generic Rust types implementing the Serde `Serialize` and `Deserialize` traits into the buffer. - The `Buffer` itself is just a thin wrapper around either a Plasma object buffer or a process-local buffer. - I'm not too sure how type information is encoded, if this is encoded somewhere in an `RayObject`'s metadata, or if this can be handled solely by serialized tasks. - However, in the case where tasks do not have a hierarchical structure and an `ObjectRef` is passed outside of its driver's scope (ownership transfer), or even simply enters another programming context, the type guarantees might fail. - How do Java/C++ handle this issue...? - C++: msgpack deserialization type error = invalid argument. However, couldn't msgpack successfully deserialize a type that was originally a different type, thus leading to a silent error?

Roadmap

The following is a proposed roadmap for a first phase of work. The order of items is flexible, but the one below is more or less expected.

Notes:

  1. As was suggested, I have avoided planning for local mode. Implementation of this could be planned for the future if deemed necessary.

Structure of the C++ API

To have a guide on what it takes to implement a Ray API, we study the C++ API.

include:

- it consists of: - some types like exceptions (`RayException` and variants) - This is related to - `RayRuntime` virtual base class (why do we need to define it here?) - `ActorCreator` and `TaskCaller`/`ActorTaskCaller` which modify `ActorCreatorOptions` and `CallOptions` (as defined in task_options.h) respectively, - their return types `ActorHandle` (actor_handle.h) and `ObjectRef` (object_ref.h) - Serialization with `msgpack`. Still not too sure where serialization is used exactly. - There is also something to do with wrapping task args (arguments.h/common_types.h); I think I will figure this out for Rust as things go along. - There is also handling of function calling and a remote function registry? (function_manager.h), which is pretty language-specific and I would have to find out how to do for Rust as well. - Presumably, a newly compiled ray driver ought to be able to farm out tasks and actors to an existing cluster of workers by serializing tasks and actor classes in the form of shared object files. - If I'm not wrong, this is the approach taken by the C++ API. - As for circumventing this manual process, perhaps one could take advantage of boxing `dyn` somehow for dynamic traits and fns. - I can't say I know what's going on in type_traits.h - There is also logging and ray worker config. - As for the main `api.h` file: - This mainly consists of the user-facing API. - It also provides inline implementations that are wrappers around methods of the statically contained `RayRuntime`. This is the API's main entrypoint into the underlying `CoreWorker`.

As for the rest of the cpp/src/ray:

- This is easier said than done since interop is tricky, and one needs to translate concepts in one language to the other, notwithstanding `cxx`'s helpful translations. - Details: - `GlobalStateAccessor` is mostly just used for placement groups and ocassionally `Actor`, so it may not be an implementation priority. - `WorkerContext` is used to get `taskID`, next put index, and `JobID`. - The `CoreWorker` is used in the wrapped API for handling actors and object references. It is also used by the `TaskSubmitter` to submit tasks and create actors. - Implementing `TaskExecutor` is probably a more or less rust-specific problem. - Not sure why one converts a slice of Objects/Refs to a serialized buffer for `ExecuteTask`. - Not sure of the difference between `Invoke` and `ExecuteTask`. - `ObjectStore` and `TaskSubmitter` merely wrap relevant `CoreWorker` APIs. - misc: why do we need to declare this before use: `using ray::core::CoreWorkerProcess;`?? Shouldn't it be imported by default when including the header file...?

More specific questions

- How is the config fed into the `CoreWorker`/`CoreWorkerProcess`...? - One uses a static per-process config with a process-global accessor. - Launching a `CoreWorker` via `CoreWorkerProcess`? - This is done during `ray::init()`. It has handled by the `ProcessHelper::RayStart`, this starts the GCS accessor and the `CoreWorkerProcess`.

The Relevant Ray Components (e.g. CoreWorker)

The CoreWorker manages memory stores, cluster and service connections, and holds the WorkerContext. sends execution requests to a task execution service, which is an instrumented version of Boost's async executor. It also handles gRPC calls made by other processes in the Ray runtime, such as task submission.

The WorkerContext holds various metadata about the state of the given worker, including info about IDs, placement groups etc. It also includes the WorkerThreadContext which manages thread-specific state, such as tasks and placement groups.

The CoreWorkerProcess is essentially the runtime context for a CoreWorker that exposes global accessors for a given process. It can even spin up multiple workers.

RayFunction is an attempt to have a general function metadata in anticipation of language interop on Ray.


Implementation Considerations

- **Process-static accessors**: Currently, in the C++ API `ConfigInternal`, `RayRuntimeHolder` are static objects that are meant to act as global accessors for the given worker process. - One should think of the idiomatic way to do this in Rust. Very likely via `lazy_static!` or a variant. - The absolute neccesity of such static objects is unclear to me. In fact, it is somewhat strange that one should rely on such per-process static accessors when one can have multiple workers managed by the same process (according to `core_worker_process.h`) - Thinking on it more, it must be the case that the single driver and all of the workers living in a single process share the same config, `CoreWorker` and runtime. - Multiple workers per process are handled by a thread-local `WorkerID`; However, I am unsure how one spins up multiple thread-local workers per process. (I think the workflow is that the user-side code must spin up threads, which each declare their own thread-local `WorkerID` to a callback function). ```c++ /// If more than 1 worker is started, only the threads which invoke the /// `task_execution_callback` will be automatically associated with the corresponding /// worker. If you started your own threads and you want to use core worker APIs in these /// threads, remember to call `CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id)` /// once in the new thread before calling core worker APIs, to associate the current /// thread with a worker. You can obtain the worker ID via /// `CoreWorkerProcess::GetCoreWorker()->GetWorkerID()`. ``` - **Passing in config (and other input) variables to the `CoreWorker`**: - `cxx` actually offers an interesting pathway to do this. One can create an intermediate interoperable type, and then access it's fields from C++ shim code to instantiate the actual input C++ struct. - i.e. just have a wrapper `ConfigContainer` with C++ shim methods which allow updates to each of the fields. - The degree to which this is necessary is not clear atm. - The task execution callback passed by the C++ API is `TaskExecutor::ExecuteTask` - **Serialization.** Actually, Rust's serde can serialize to msgpack. - although messagepack achieves very good compression, it's slower than some alternatives - and I'm not sure if this impacts performance overall (probably not, but still good to think about). [some benchmarks - scroll to end for summary](https://blog.logrocket.com/rust-serialization-whats-ready-for-production-today/). - These characterizations and the given benchmark probably should be taken with a pinch of salt, however. - **Direct object buffer writing and sealing**: Exposing a new frontend API for this that reduces memory copies: - What specific use-case do you need this for? - Example: materialized record tuple batches in SQL...? - Perhaps you can build a Rust wrapper type that handles this - For instance: `PlasmaObject` - `let mut x = PlasmaObject::new();` - `let x_ref = x.seal(); // x is consumed` - Actually, the `CoreWorker` already exposes the following API; one just needs to wrap the `Buffer` class too: ```c++ Status CreateOwned(const std::shared_ptr &metadata, const size_t data_size, const std::vector &contained_object_ids, ObjectID *object_id, std::shared_ptr *data, bool created_by_worker, const std::unique_ptr &owner_address = nullptr, bool inline_small_object = true); // difference with `SealExisting`...? // answer: it also keeps track of references Status SealOwned(const ObjectID &object_id, bool pin_object, const std::unique_ptr &owner_address = nullptr); ``` Apparently you can implement dropping the owned object in the heap with `Box` as follows: ```rust // `Pair` owns resources: two heap allocated integers struct Pair(Box, Box); impl Pair { // This method "consumes" the resources of the caller object // `self` desugars to `self: Self` fn destroy(self) { // Destructure `self` let Pair(first, second) = self; println!("Destroying Pair({}, {})", first, second); // `first` and `second` go out of scope and get freed } } ```
stale[bot] commented 2 years ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

stale[bot] commented 2 years ago

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

clarkzinzow commented 2 years ago

Reopening because I still really want to see this. 😄

iasoon commented 2 years ago

I would be interested in helping out here!

@jon-chuang is https://github.com/ray-project/ray/pull/21572 the most recent work that was done on this?

stale[bot] commented 1 year ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

stale[bot] commented 1 year ago

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

crazyboycjr commented 1 year ago

What's the state of this PR? Will ray have rust binding or support in the future? I see a large PR https://github.com/ray-project/ray/pull/21572, but it is closed due to inactivity. I would love to see rust binding and rust workers in the future.

mklifo commented 1 year ago

Bump, Rust support would be awesome.

jon-chuang commented 1 year ago

It was determined that multi-language support for Ray is not a top priority as the Python interface is the most used, and ML applications which largely have python bindings are the top priority.

Generally, many applications can be handled by using python worker as a wrapper over the application code (either via bindings or by acting as the parent process over the application).

I do agree that this can introduce overhead, a C-worker seems like the most natural solution in this case, which the user can bind to their application.

So at the least, exposing a shared library for worker with C-ABI for the user to bind to is a good step without introducing maintenance burden on the Ray team.

What are your use-cases @crazyboycjr @mklifo ?

crazyboycjr commented 1 year ago

@jon-chuang jjyao has replied to me under #21572 and I'm totally fine with that answer.

I came up to search around and ask related stuff mainly because one day I felt it would be great to have a runtime that can conveniently dispatch my CPU-intensive jobs written in Rust across my cluster. Ray, apparently is the most suitable backend for this demand in my mind. I'm also asked by others quite a few times if there is any crate that can do this in the Rust ecosystem and the answer seem to be a 'no' for now.

I am open for any further discussion towards enabling Ray to the Rust ecosystem.

jon-chuang commented 1 year ago

The work in https://github.com/ray-project/ray/pull/21572 can be continued if you are interested and I can provide some guidance. Here is a previous plan to split up the work: image

jon-chuang commented 1 year ago

The most important part is producing the .so artifact. Here is the implementation for exposing C-FFI to the core_worker: https://github.com/jon-chuang/ray/blob/ce9b206dca518e740852d33630e502eaeec93fca/src/ray/core_worker/lib/c/c_worker.cc

crazyboycjr commented 1 year ago

I really appreciate your effort on this direction @jon-chuang ! Unfortunately, I personally won't have spare bandwidth in the foreseen future (a few weeks to maybe months). But I will take a deep look at it someday.

Question: Do you indicate that this 2350 loc + 1100 loc test + ~40K loc in #21572 will be reviewed by relevant Ray developers? That sounds somewhat awful to me... I'm a little worried whether the upstream would be happy to accept this huge PR.

jon-chuang commented 1 year ago
  • ~40K loc

This is only due to generated protobuf code. The only code to review is 2350 loc + 1100 loc test

luminoctum commented 1 year ago

I’m also like to see this happening. I share the same need, to parallelize my rust code using Ray. Not sure where to start helping.

zuston commented 1 year ago

Any update on this?

rkooo567 commented 1 year ago

There's no active progress/plan to support this feature now

zuston commented 1 year ago

There's no active progress/plan to support this feature now

Thanks.

npuichigo commented 9 months ago

It’s awesome if we can use rust ray together with huggingface/candle

jmakov commented 7 months ago

Too bad this isn't progressing.

errord commented 2 weeks ago

This is a non-urgent but important feature that we hope the ray team will consider.