ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.27k stars 5.81k forks source link

[Bug] Placement group removal refinement #19937

Open rkooo567 opened 3 years ago

rkooo567 commented 3 years ago

Search before asking

Ray Component

Ray Core

What happened + What you expected to happen

Currently, when placement groups are destroyed, all the relevant workers / tasks are fate shared. At this time, raylet basically calls DisconnectClient and DestoryWorker, meaning it closes the socket and send a sigterm.

This approach has a couple issues.

TODO

Versions / Dependencies

Master

Reproduction script

N/A

Anything else

No response

Are you willing to submit a PR?

rkooo567 commented 3 years ago

@clay4444 are you planning to looking into this?

I think you can probably reproduce it by making worker startup time very slow and remove the placement group while workers are starting up

clay4megtr commented 3 years ago

@clay4444 are you planning to looking into this?

I think you can probably reproduce it by making worker startup time very slow and remove the placement group while workers are starting up

Yeah, I will check this out

clay4megtr commented 3 years ago

reproduce process:

  1. shutdown the feature of prestart workers, you can just do this in a hack way, just like this: image image
  2. Add a delay to the worker process startup, NOTE: Must use random here! image
  3. run this ut test

    def test_remove_placement_group(ray_start_cluster):
    cluster = ray_start_cluster
    cluster.add_node(num_cpus=4)
    ray.init(address=cluster.address)
    
    # First try to remove a placement group that doesn't
    # exist. This should not do anything.
    random_group_id = PlacementGroupID.from_random()
    random_placement_group = PlacementGroup(random_group_id)
    for _ in range(3):
        ray.util.remove_placement_group(random_placement_group)
    
    # Creating a placement group as soon as it is
    # created should work.
    placement_group = ray.util.placement_group([{"CPU": 2}, {"CPU": 2}])
    assert placement_group.wait(10)
    
    ray.util.remove_placement_group(placement_group)
    
    wait_for_condition(lambda: is_placement_group_removed(placement_group))
    
    # # Now let's create a placement group.
    placement_group = ray.util.placement_group([{"CPU": 2}, {"CPU": 2}])
    assert placement_group.wait(10)
    
    # Create an actor that occupies resources.
    @ray.remote(num_cpus=2)
    class A:
        def f(self):
            return 3
    
    # Currently, there's no way to prevent
    # tasks to be retried for removed placement group.
    # Set max_retrie=0 for testing.
    # TODO(sang): Handle this edge case.
    @ray.remote(num_cpus=2, max_retries=0)
    def long_running_task():
        print(os.getpid())
        import time
        time.sleep(50)
    
    # Schedule a long running task and actor.
    task_ref = long_running_task.options(
        placement_group=placement_group).remote()
    a = A.options(placement_group=placement_group).remote()
    assert ray.get(a.f.remote()) == 3
    
    ray.util.remove_placement_group(placement_group)
    # Subsequent remove request shouldn't do anything.
    for _ in range(3):
        ray.util.remove_placement_group(placement_group)
    
    # Since the placement group is removed,
    # the actor should've been killed.
    # That means this request should fail.
    with pytest.raises(ray.exceptions.RayActorError, match="actor died"):
        ray.get(a.f.remote(), timeout=3.0)
    with pytest.raises(ray.exceptions.WorkerCrashedError):
        ray.get(task_ref)

then, you can easily reproduce the first issue: image

clay4megtr commented 3 years ago

The reason why we need random in step 2 is that we need to make sure the worker corresponding to the long_running_task start later than the worker corresponding to the actor A.