ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.13k stars 5.48k forks source link

[Core][Object Store] Push Manager: round for object manager client and FIFO for object #34270

Open Catch-Bull opened 1 year ago

Catch-Bull commented 1 year ago

Description

Regarding the round-robin algorithm of the push manager in our scenario:

I think there were too many invalid chunk transfers. The scheduling of normal tasks is prone to conflicts, resulting in a large number of waiting tasks in the waiting task queue of a node. When these tasks simultaneously pull objects, their argument preparation time becomes similar, and only a few tasks can be dispatched to workers smoothly, while other tasks will be spilled out, leading to a waste of all these pull requests.

Here is a simple test:

import ray
import numpy as np
import time
from tqdm import tqdm
from ray.cluster_utils import Cluster

SYSTEM_CONFIG = {
    # force argument to be put into object store
    "max_direct_call_object_size": 512,
    "object_manager_default_chunk_size": 25 * 1024,
    "object_manager_max_bytes_in_flight": 400 * 25 * 1024,
    "locality_aware_leasing_enabled": True,
}

cluster = Cluster()

cluster.add_node(
    object_store_memory=75 * 1024 ** 2,
    _system_config=SYSTEM_CONFIG,
    memory=16*1024**3,
    num_cpus=2,
)

ray.init(address="auto")

cluster.add_node(
    object_store_memory=75 * 1024 ** 2,
    memory=16*1024**3,
    num_cpus=2,
)

cluster.add_node(
    object_store_memory=75 * 1024 ** 2,
    memory=16*1024**3,
    num_cpus=2,
)

# N MB
def get_obj(size=5):
    return np.random.randint(low=255, size=size * 1024 * 1024, dtype=np.uint8)

get_obj_remote = ray.remote(memory=8*1024**3)(get_obj)

args1 = []
args2 = []
for _ in tqdm(range(100)):
    args1.append(get_obj_remote.remote())
    args2.append(get_obj_remote.remote())

@ray.remote(memory=8*1024**3)
def get_sum(data1, data2):
    print(data1.sum(), data2.sum())
    ray.put(get_obj(35))
    return True

refs = []
for data1, data2 in zip(args1, args2):
    refs.append(get_sum.remote(data1, data2))

for ref in tqdm(refs):
    print(ray.get(ref))

ray.shutdown()

Use case

round for object manager client and FIFO for object

prototype: https://github.com/ray-project/ray/pull/34269

rkooo567 commented 1 year ago

Hmm what's the proposal here? You are saying we should do FIFO chunk transfer instead of round robin?

rkooo567 commented 1 year ago

This can have issues like many tasks that require small objs cannot be scheduled because of a task that requires large objects?

Catch-Bull commented 1 year ago

This can have issues like many tasks that require small objs cannot be scheduled because of a task that requires large objects?

@rkooo567 Sorry, I missed your comment.. so I am replying late. Actually, our ultimate goal is to resolve this issue. The details of the current issue are on the PR, and we can discuss them on the PR.