ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34k stars 5.78k forks source link

[Core][Bug] global-scoped actor handles/Ray objects prevents Ray workers from being destructed. #23677

Open scv119 opened 2 years ago

scv119 commented 2 years ago

Search before asking

Ray Component

Ray Core

Issue Severity

Medium: It contributes to significant difficulty to complete my task but I work arounds and get it resolved.

What happened + What you expected to happen

Specifically, we have a single-core (single-worker) cluster and three different map executions that alternate compute models:

ray.init(num_cpus=1)
ds = ray.data.range(10)

class StatefulFn:
   ...

# 1
ds.flat_map(StatefulFn, compute="tasks").take()
# 2
ds.flat_map(StatefulFn, compute="actors").take()
# 3
ds.flat_map(StatefulFn, compute="tasks").take()

where StatefulFn construction should be cached in the Python worker for the “tasks” compute model, and in the actor for the “actors” compute model. Before the change, #1, #2, and #3 would each create a new Python worker, so the StatefulFn wouldn’t be cached across those map executions, as expected. After the change, #1 and #3 somehow use the same Python worker and therefore reuse the cached StatefulFn construction, which is definitely unexpected. I’d expect the Python worker created in #1 to be destroyed once execution of #2 starts, so I’m not sure how the worker for #1 is getting reused for #3.

for non-actor workers, global object references will never be force removed since they’re assumed to be held by other processes, so such a global-scope named actor handle is expected to cause this worker to live forever.

Versions / Dependencies

latest ray

Reproduction script

N/A

Anything else

No response

Are you willing to submit a PR?

scv119 commented 2 years ago

References that are captured in a remote function or class definition will be pinned permanently. For example:

x_ref = foo.remote()
@ray.remote
def capture():  
   ray.get(x_ref)  # x_ref is captured. It will be pinned as long as the driver lives.

One thing i didn't fully get is why can't we release the reference (since it's borrower) after the capture finished execution. cc @stephanie-wang

rkooo567 commented 2 years ago

@clarkzinzow

global-scope named actor handle

Can you show me an example of what this means?

clarkzinzow commented 2 years ago

@rkooo567 Sure!

import ray

# Actor handle lives at global scope.
handle = None

def _get_named_actor():
    global handle

    if handle is None:
        handle = ray.get_actor("adder")
    return handle

@ray.remote
class Adder:
    def __init__(self, x):
        self.x = x

    def add(self, y):
        return self.x + y

@ray.remote
def add(y):
    # This global actor handle will be set in a non-driver worker.
    adder = _get_named_actor()
    return ray.get(adder.add.remote(y))

adder = Adder.options(name="adder").remote(1)
results = [add.remote(y) for y in range(10)]
results = ray.get(results)
assert results == list(range(1, 11))
stephanie-wang commented 2 years ago

Yeah it does seem like this is a bug. The reference should be a "borrower" and the worker is also supposed to force-remove its local ref count upon exit.

clarkzinzow commented 2 years ago

@stephanie-wang Btw I think it may actually be a borrower as expected, I have to double-check. It looks like we might always consider fetched named actor handles to be "detached" in the actor manager, so it looks like it wouldn't be an owned reference and isn't the underlying issue.

ericl commented 2 years ago

This might be a fundamentally hard issue to fix, since global references are indistinguishable from references caught in transient GC cycles.

I don't see how you can solve this without adding GC collect calls after task executions, which would be prohibitively expensive.