ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.28k stars 5.63k forks source link

[Core][Object Store][Feature] Fair object store sharing across jobs running in the same node/cluster #20146

Open scv119 opened 2 years ago

scv119 commented 2 years ago

Search before asking

Description

In Ray, there can be multiple worker processes in each computing node (e.g. one physical machine) sharing the same Raylet and Object Store. At the moment, there seems to be no restriction of each worker process actually consuming the memory in the shared store. This could be a challenge for some workload as our user reports.

Ideally, There should be some restriction per worker process (or per task) to make sure that the shared Object Store can be fairly used across worker processes, in order to prevent potentially aggressive consumption by one or few worker processes such that other worker processes would have to spill with perf pain;

There is something similar in the execution memory of Spark, where multiple tasks can be restricted to use [1/ 2N, 1/N] of the shared execution memory before spilling, (https://0x0fff.com/spark-memory-management/) this might be something we can refer to for Ray, or we can come up with some restriction mechanism to be more performant. Any thoughts?

Use case

N/A

Related issues

N/A

Are you willing to submit a PR?

superhaiou commented 2 years ago

Things we could do are as follows:

1) "That’s definitely doable in Ray. This could be implemented as a thin layer over plasma store to do quota/admission control. The main challenge is that a Ray object could be shared between different workers so it’s a bit tricky to decide the owner of the objects."

2) "we had a legacy feature that could be used to set quotas per actor. However, we removed this code a few months ago since it was never really used. Maybe we could modernize this with "fair spilling" (e.g., preferentially spill objects based on some criteria like per-worker usage). Right now, object spilling is spilling randomly chosen objects."

3) "Ray doesn't differentiate between execution and storage memory right now (nor do I think this really makes sense in the object store model). We could do something like fair sharing across jobs running in the same cluster though. Then say there's a job holding cached objects, and another shuffle job running, the cache would be guaranteed at least 50% of the object store under memory pressure."

superhaiou commented 2 years ago

We'd need to figure out a reasonable fairness criteria; BTW, the above 3 items are actually for 2 tasks: 1 ) Support object store restriction per worker process, using some sort of fairness algorithm; Here we can come up with some generic policies or have users able to configure the weights of object store consumption per worker or task or actor 2) Object store restriction between object caching and other executions such as shuffling

The above policies can be turned off by default, and users can turn it on if needed. This feature will be useful when memory is under pressure, e.g. Ray is used to replace Map/Reduce scenarios.

scv119 commented 2 years ago

also cc @rkooo567 @ericl @stephanie-wang @clarkzinzow

superhaiou commented 2 years ago

The feature will be useful when memory is under pressure, e.g. we are leveraging Ray to replace Map/Reduce tasks, and memory is being used significantly; Both CPU and memory will be used 100% all the times, so other tasks will be queued for scheduling; Memory restriction per task/actor is essential for our scenario

superhaiou commented 2 years ago

More specifically, our use case is offline index building for web pages. We are hoping that Object Store can be well leveraged (for pushing objects per task and multi-task intermediate output.

stephanie-wang commented 2 years ago

Hmm just to clarify, is the problem only about multitenant nodes/clusters, or is it also for worker processes in the same job? It's not clear to me that performance is a problem if the worker processes are all in the same job, as long as the total memory and spill bandwidth are maximized.

superhaiou commented 2 years ago

Hmm just to clarify, is the problem only about multitenant nodes/clusters, or is it also for worker processes in the same job? It's not clear to me that performance is a problem if the worker processes are all in the same job, as long as the total memory and spill bandwidth are maximized.

We are dynamically launching Ray clusters per job, and there is possibility that one task in a worker process is pushing too much into the Object Store while other tasks needing smaller pushes can not move forward (spilling would be required for other tasks even with small pushes), which is not fair

superhaiou commented 2 years ago

As the users would estimate the Object Store pushes for each task, so one approach would be to allow the users to set the quota/weight for each type of task such that the Object Store occupation would be much more fair by respecting the weights (by default each task has the same weight). In addition, for the spilling, we shouldn't do random spilling and should consider more policies for fairness

scv119 commented 2 years ago

For mechanism, this looks trivial: whenever a client wants to create a new object in plasma store, we can consult an admission control component to allow-reject the requests. For policy, we need to decide all the options to get the allowance for each task/worker. Some ideas I have (or inspired by this thread:)

stephanie-wang commented 2 years ago

For mechanism, I think the key question is how to reject requests without deadlocking. What happens if that client needs the object to continue? One simple option we could try is, instead of "rejecting" the request, we force the client to spill.

But I think I am still not super clear on the application setting. I'm not sure if fairness is enough to guarantee that small objects would get prioritized for example. @superhaiou, can you say more about your application setting? Or having some code would also be helpful.

superhaiou commented 2 years ago

For mechanism, I think the key question is how to reject requests without deadlocking. What happens if that client needs the object to continue? One simple option we could try is, instead of "rejecting" the request, we force the client to spill.

But I think I am still not super clear on the application setting. I'm not sure if fairness is enough to guarantee that small objects would get prioritized for example. @superhaiou, can you say more about your application setting? Or having some code would also be helpful.

i think we cannot reject requests, instead we should spill; Today we are dynamically launching a Ray cluster per job, if a reducer is pushing too much outputs to the Object Store, it would be blocking other mappers and reducers or future shufflers pushing their outputs to the Object Store even if the output is way smaller than that reducer, which is not fair; In addition, we'll have to use multi-tenancy in a long-running Ray cluster for some online services though we are working on offline indexing right now; Therefore we should design the mechanism and policies for multi-tenancy scenarios as well; I think we should allow users to set a hard limit or some weight, as we have to do spilling anyway, shall we come up with some fair policies across worker processes as opposed to randomly spilling? thx

superhaiou commented 2 years ago

For the application setting, say we are using a dynamically launched Ray cluster, and there are only two types of tasks: mapper and reducer; We are aware that reducer is outputting way more data to Object Store than mapper does, say 5 times; We'd like to set the weight of mapper to 1 while the weight of reducer to 2 (5 without any change) such that the outputs of reducers would not impact the intermediate output of mappers too much. Note that the outputs of reducers should have lower priority than mappers so spilling that data should be fine, thx.

ericl commented 2 years ago

Yeah I agree that you can achieve fairness at the spilling level only. For example, this could be achieved by blocking the request until we have spilled enough objects created by the client. Or, we can block all requests and only adjust the spilling policy to prefer spilling objects from over quota clients. The latter would be much simpler but provides lower QoS guarantees.

scv119 commented 2 years ago

Better late than never: we have some internal discussions and some prototypes/experiments done. So far we are thinking to solve the problem by

  1. prioritize which tasks to execute first;
  2. decide which task to block (which includes spilling the object generated by that task and blocks new object from being created by that task). Also following is a more formal system model:

System model