ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.87k stars 5.76k forks source link

Support resizing placement groups #16403

Open clay4megtr opened 3 years ago

clay4megtr commented 3 years ago

Describe your feature request

For feature requests or questions, post on our Discussion page instead: https://discuss.ray.io/

Hello guys, I want to add the resizing feature for the placement group as we talked about before, This API would like this:

def test_placement_group_add_bundles_api_basic(ray_start_cluster):
    @ray.remote(num_cpus=2)
    class Actor(object):
        def __init__(self):
            self.n = 0

        def value(self):
            return self.n

    cluster = ray_start_cluster
    cluster.add_node(num_cpus=4)
    ray.init(address=cluster.address)

    # Create a infeasible placement group first.
    infeasible_placement_group = ray.util.placement_group(
        name="name", strategy="PACK", bundles=[{
            "CPU": 8
        }])
    assert not infeasible_placement_group.wait(4)
    # Make sure the add bundles request will fail since it is pending now.
    with pytest.raises(
            ray.exceptions.RaySystemError,
            match="the placement group is in scheduling now"):
        infeasible_placement_group.add_bundles([{"CPU": 2, "memory": 50 * MB}])

    # Remove the infeasible placement group.
    ray.util.remove_placement_group(infeasible_placement_group)

    def is_placement_group_removed():
        table = ray.util.placement_group_table(infeasible_placement_group)
        if "state" not in table:
            return False
        return table["state"] == "REMOVED"

    wait_for_condition(is_placement_group_removed)

    # Create a feasible placement group now.
    placement_group = ray.util.placement_group(
        name="name", strategy="PACK", bundles=[{
            "CPU": 2
        }])

    # Wait for the placement group to create successfully.
    assert placement_group.wait(5)

    placement_group.add_bundles([{"CPU": 2,}])
    table = ray.util.placement_group_table(placement_group)
    assert len(list(table["bundles"].values())) == 2
    assert table["state"] == "CREATED"

    # Wait for the add new bundles operation to finish.
    assert placement_group.wait(5)

    # Schedule an actor through the new bundle index.
    actor = Actor.options(
        placement_group=placement_group,
        placement_group_bundle_index=1).remote()

    ray.get(actor.value.remote())

BTW, you can also replace the wait(timeout) API with the ready() API if you want.

Also, the related proposal document is here: API proposal

Please leave comments if you have any requirements or questions, Thx!

rkooo567 commented 3 years ago

@edoakes @simon-mo @krfricke @richardliaw Please comment on the API!

clay4megtr commented 3 years ago

hi, @edoakes @simon-mo @krfricke @richardliaw , Are there any questions about the API?

edoakes commented 3 years ago

@clay4444 what is the API to remove a bundle from the placement group? Also, what are the semantics here -- are all tasks/actors running in it killed?

clay4megtr commented 3 years ago

@edoakes First question: The remove bundles API is similar with the add API, An example:

def test_placement_group_remove_bundles_api_basic(ray_start_cluster):
    @ray.remote(num_cpus=2)
    class Actor(object):
        def __init__(self):
            self.n = 0

        def value(self):
            return self.n

    cluster = ray_start_cluster
    cluster.add_node(num_cpus=4)
    ray.init(address=cluster.address)

    # Create a infeasible placement group first.
    infeasible_placement_group = ray.util.placement_group(
        name="name",
        strategy="PACK",
        bundles=[{
            "CPU": 2,
        }, {
            "CPU": 4,
        }])
    assert not infeasible_placement_group.wait(3)
    # Make sure the remove bundles request will fail since it is pending now.
    with pytest.raises(
            ray.exceptions.RaySystemError,
            match="the placement group is in scheduling now"):
        infeasible_placement_group.remove_bundles([1])

    # Remove the infeasible placement group.
    ray.util.remove_placement_group(infeasible_placement_group)

    def is_placement_group_removed():
        table = ray.util.placement_group_table(infeasible_placement_group)
        if "state" not in table:
            return False
        return table["state"] == "REMOVED"

    wait_for_condition(is_placement_group_removed)

    # Create another placement group that can be scheduled.
    placement_group = ray.util.placement_group(
        name="name",
        strategy="PACK",
        bundles=[{
            "CPU": 2,
        }, {
            "CPU": 2,
        }])
    # Wait its creation done.
    assert placement_group.wait(5)

    # Schedule a normal task to let the core worker
    # register a bundles changed listener.
    @ray.remote(num_cpus=2, memory=50 * MB)
    def dummy_task():
        return True

    ray.get(
        dummy_task.options(
            placement_group=placement_group,
            placement_group_bundle_index=0).remote())

    # Remove the second bundle.
    placement_group.remove_bundles([1])
    assert placement_group.wait(5)

    # Validate the metadata information.
    table = ray.util.placement_group_table(placement_group)
    assert len(list(table["bundles"].values())) == 2
    assert table["state"] == "CREATED"
    assert table["bundles_status"][0] == "VALID"
    assert table["bundles_status"][1] == "INVALID"

    # Make sure the actor creation will fail as the second bundle has
    # been removed. We need this test to make sure the core worker will
    # receive the bundles changed event and refresh the local bundles view.
    with pytest.raises(
            ray.exceptions.RaySystemError, match="Invalid bundle index"):
        Actor.options(
            placement_group=placement_group,
            placement_group_bundle_index=1).remote()

Second question: Yeah, It is, we will kill all actors and tasks related with the removed bundle.

edoakes commented 3 years ago

@clay4444 what is the metadata available in the placement_group_table? Is it just the state and bundles_status? For example one important feature for Ray Serve and likely other applications will be to target a specific placement group that Serve has already shut down the actors in or a placement group on a specific node.

clay4megtr commented 3 years ago

@clay4444 what is the metadata available in the placement_group_table? Is it just the state and bundles_status? For example one important feature for Ray Serve and likely other applications will be to target a specific placement group that Serve has already shut down the actors in or a placement group on a specific node.

hmm.. I'm not sure if I got your mean correctly, the first question is that you killed an actor, and how to get the specific placement group bundle that the original actor located in? If it is, I think you should save the relationship between actors and bundle indexes on the driver side, then you can always get the specific bundle.

And about the second question, I remember we have discussed it early. With the current API, you can get the bundle info(including the bundle index and node info) firstly and then remove bundles through the bundle index, but to simplify this, we can encapsulate this into a new API if needed.

clay4megtr commented 3 years ago

if there are no big problems with this API (except some detailed questions), Can I start to submit the pr? because the Mars and the Streaming also want to use this feature, and actually, they have already used it in our internal. @raulchen @rkooo567 @edoakes

edoakes commented 3 years ago

@clay4444 do you still plan to implement this?

TanjaBayer commented 2 years ago

We are using Ray Serve mainly in our deployments. We have encountered so far that placement groups are right now supported for ray serve, which seems to be related to this feature. Let me quickly explain what are our needs, maybe that has an impact on this feature. Right now we have no control where ray serve places the deployments (except for using custom resources), however there are tow different cases which can occure:

The spread strategey, however is the more important one for us.

clay4megtr commented 2 years ago

hi, @TanjaBayer , Thanks for the comment! but I think the two problems you mentioned aren't related to this issue, right? There are two sorts of ways to solve the problems you mention if you are using the placement group directly.

  1. Create a placement group that uses spread strategy, then put the deployments you want to spread in different nodes to every single bundle and put the deployments you want to pack in one same node to an independent bundle, then you can achieve what you want at the same time.
  2. Create two placement groups, one uses the spread strategy, and another uses the pack strategy. Then, you can achieve what you want by putting deployments to the corresponding placement groups. ( but please notice: in this way, there is still a probability to schedule the deployments using pack strategy to one node that has scheduled one deployment using the spread strategy before! )

Actually, I don't exactly know how Ray Serve uses the placement group, but I think it could be easy to achieve what you want if Ray serves support to customize the relationship between deployments by using the placement group? cc@edoakes @simon-mo

simon-mo commented 2 years ago

Ah to add the context here in Ray Serve we need resizeable placement group because the number of actors can change as we scale up and down.

edoakes commented 2 years ago

Yup, this is blocked on being able to resize for Serve!

rkooo567 commented 2 years ago

I think we can start working on adopting this feature once all of existing P1s are closed. ETA is 2~4 weeks. (at that point, we can say the existing pg is fairly stable). A couple acceptance criteria;

anovv commented 1 year ago

Hi @ericl, @rkooo567 what's the status on this?