ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.4k stars 5.66k forks source link

[core] Support detached/GCS owned objects #12635

Open ericl opened 3 years ago

ericl commented 3 years ago

Overview

In the current ownership model, all created objects are owned by some worker executing the job. This means however that when a job exits, all created objects become unavailable.

In certain cases it is desirable to share objects between jobs (e.g., shared cache), without creating objects explicitly from a detached actor.

We have a current internal API to allow the owner of an object created with ray.put() to be assigned to a specific actor, e.g.:

ray.put(data, _owner=actor_handle)

This means the object created will fate-share with that actor instead of the current worker process. This means that it's currently possible to create global objects by creating a named detached actor, and setting the owner to that actor.

However, it would be nice to support ray.put(data, _owner="global") to avoid the need for that hack, and allow the object to be truly HA (e.g., tracked durably in HA GCS storage).

fishbone commented 3 years ago

I have a concern about this method. In this way, it looks like we treat the global storage as a key-value (with schema) storage and after running ray.util.put_global_ref(obj_ref, lookup_key="objfoo1") we store objfoo1 -> obj_ref in the storage which might leads to memory leak. It looks like makes memory management complicated. It's different from the actor which when it terminates it'll release all the resources. Besides if we do it in this way, we probably want to introduce namespace to avoid conflict and to prevent some object from being accidentally being updated by other apps.

If we have a global actor per group of task, and it's served for this purpose, will it just work? Will it be simpler? Do we need to support move ownership for this?

ericl commented 3 years ago

I agree binding the lifetime of an object to an actor might be the way to go. The way I see it there are probably a few use cases: (1) pin object permanently, manual deletion (2) allow object to be evicted if under memory pressure (caching case) (3) bind lifetime of object to another actor/task (similar to ownership transfer)

Perhaps we could create the following API? Perhaps we don't need to give objects an explicit name, just allow their lifetime to be changed:

ray.util.set_object_lifetime(obj_ref, lifetime="detached"|"lru"|<actorRef>)

# can only implement <actorRef> case initially

Btw, actually doing ownership transfer is pretty hard, since the owner metadata is propagated inline with the object reference. You'd need to add some kind of transfer table to handle that. Assuming the number of objects that require transfer is quite small, it would be easier to instead only allow ownership to be transferred to the GCS. This is simpler since it means owner lookups can always fall back to GCS on owner failure.

fishbone commented 3 years ago

I think my concern here is two things: 1) how to manage all the detached variables; 2) how to do better isolation between groups of jobs.

how to manage all the detached variables We plan to make ray run in a cluster, and developers can submit their job to the cluster. If someone created many detached variables by using (1), it'd prevent other jobs from running due to OOM. And it's not easy to recover from this since we don't have extra info about all these detached variables. Besides, if we do ray.util.set_object_lifetime(obj_ref, lifetime="detached"|"lru"|<actorRef>) how can another job get the obj_ref? It should be a variable shared between jobs, but if one job exit, how can it be transferred from one to another?

Here I assume one job to be a .py file and will be executed by ray submit job.py or python job.py. Please let me know if I'm wrong.

how to do better isolation between groups of jobs If we use the variable name to do communication between jobs (if I understand the concept of the job correctly), the variable name will be a global variable visible to all jobs in the cluster. If we have a series of jobs and run them with different inputs, there will be conflicts.

I thought of introducing job groups of something else, but then it looks like one driver job with several sub-jobs. So sub-jobs(separate py file) can run in the driver job (main py file), and the driver job will have a reference of the shared data, which will work.

I think something is missing here in my brain. Please share more details about this.

ericl commented 3 years ago

Another user ask in https://github.com/ray-project/ray/issues/12748

ericl commented 3 years ago

If someone created many detached variables by using (1), it'd prevent other jobs from running due to OOM. And it's not easy to recover from this since we don't have extra info about all these detached variables.

This isn't something we need to worry about though--- if the user pins too much, it's their problem. With an autoscaling cluster, we can also spill / add more nodes to handle this case.

Besides, if we do ray.util.set_object_lifetime(obj_ref, lifetime="detached"|"lru"|) how can another job get the obj_ref? It should be a variable shared between jobs, but if one job exit, how can it be transferred from one to another?

One way is that the object refs can be transferred through a named actor. Perhaps we can try this to see if it satisfies use cases before adding another API.

ericl commented 3 years ago

I thought of introducing job groups of something else, but then it looks like one driver job with several sub-jobs. So sub-jobs(separate py file) can run in the driver job (main py file), and the driver job will have a reference of the shared data, which will work.

Btw, in the RayDP/Spark on Ray use case, the variable just needs to be transferred within the job, so there isn't the use case of multiple jobs yet. For caching use cases, the variables can be transferred to named actors global across many jobs. So I think changing lifetime is sufficient for both these cases, but perhaps there are others that require more functionality.

fishbone commented 3 years ago

I think I misunderstood some parts. So let me just summarize it here and please let me know if I was wrong:

Another thing I might want to be clarified here is what's a job? Is it a python script? Some demo code about what needs to be achieved should be helpful for me to understand it.

This isn't something we need to worry about though--- if the user pins too much, it's their problem. With an autoscaling cluster, we can also spill / add more nodes to handle this case.

I'm thinking that it's a cluster and multiple users will want to submit the job in the same cluster. If someone uses too much and we can't have a way to group the related variables, it's going to be a mess. But if it's through a named actor and jobs users have different named actors, then it's OK. We can limit memory usage through these actors to protect the cluster.

ericl commented 3 years ago

That sounds right to me. A couple clarifications: a Python script/driver defines a job, since we generate a unique job on connecting to a cluster. So in the Spark on Ray case, all actors run would be all within a single job.

Re: memory usage, the isolation here is not very well developed, but note this is a general problem in Ray and not something specific to detached objects. Hence, it is out of scope of this feature itself.

On Thu, Dec 10, 2020, 5:26 PM Yi Cheng notifications@github.com wrote:

I think I misunderstood some parts. So let me just summarize it here and please let me know if I was wrong:

  • Having a named actor shared across jobs to exchange data between them
  • If the user wants to do the cross job communication, pass the ownership to the actor
  • Multiple eviction policy can be supported here (detached, lru, binding it to anther job/actor)

Another thing I might want to be clarified here is what's a job? Is it a python script? Some demo code about what needs to be achieved should be helpful for me to understand it.

This isn't something we need to worry about though--- if the user pins too much, it's their problem. With an autoscaling cluster, we can also spill / add more nodes to handle this case.

I'm thinking that it's a cluster and multiple users will want to submit the job in the same cluster. If someone uses too much and we can't have a way to group the related variables, it's going to be a mess. But if it's through a named actor and jobs users have different named actors, then it's OK. We can limit memory usage through these actors to protect the cluster.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/12635#issuecomment-742905079, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSXOM64NL5Q6GBZZ3FDSUFYMRANCNFSM4UOG4ZQQ .

DonYum commented 3 years ago

Both @ericl and @ahbone 's opinions are impressive.

As @ahbone say, the management of the detached objs looks like shouldn't be given to the user. This will mess the code of user. However, In practice, we do need a 'method' to make some long-life objs.

Yes, using detached named actor to customize a 'detached objs' is a method. But it's to complete, and is implicit.

As a novice of ray, I spent hours to find some method to 'detach' a plasma obj In docs/issues and discussions, but I failed. I have to submit a new issue #12748 which lead me to here. The process was too painful. Along the way, I always thinking, why not provide such an API. Is this a technical problem? ...

In my opinion, ray core is like a toolbox, providing enough APIs is our responsibility, and how customers use it is another thing that we may not consider too much.

Hope my ideas will help. Thanks a lot!

fishbone commented 3 years ago

@DonYum Thanks for your ideas here, and I agree this is a useful feature to support. My concern here is not about providing such an API/feature but how we we should support this feature better. Early discussion of it will save us effort in the future. What worries me here is actually how to manage all these long-life objects. Suppose we have a lot of long-life objects created, now the user wants to free some objects through ray client, it's hard to know which one to free since they are all ids (maybe I was wrong, not an experienced ray user). Usually it might be useful for us to free all the global variables created by some job when they are not needed anymore. Binding them to a named actor sounds to me like a potential way to go.

@ericl Here for RayDP, as you mentioned that:

So in the Spark on Ray case, all actors run would be all within a single job.

In this way, it looks like all objects can be passed back to the driver. Why detached objects will help here? If you have some sample code about the usecase we'd like to support, it'll be helpful.

ericl commented 3 years ago

Perhaps we don't allow "true" global objects in the first iteration, to avoid the problems mentioned above. If an object can only be transferred to another actor, then that would solve the RayDP use case (single job) and also allow other use cases which span multiple jobs.

In this way, it looks like all objects can be passed back to the driver. Why detached objects will help here?

The issue is the objects are still owned by the producing worker (the one that call ray.put). If that worker is killed, the object is lost, even if the reference is passed back to the driver.

>>> @ray.remote
... class A:
...    def __init__(self):
...       self.ref = ray.put("hi")
...    def get(self):
...       return self.ref
... 
>>> 
>>> a = A.remote()
>>> ref = ray.get(a.get.remote())
>>> ray.get(ref)
'hi'
>>> ray.kill(a)
>>> ray.get(ref)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/eric/Desktop/ray/python/ray/worker.py", line 1378, in get
    raise value
ray.exceptions.ObjectLostError: Object ffffffffffffffffa67dc375e60ddd1a23bd3bb90100000002000000 is lost due to node failure.
fishbone commented 3 years ago

Thanks for the explanation. I'll spend some time to check how to support this version.

fishbone commented 3 years ago

@ericl, sorry for not making progress for a while. I recently picked it again.

I'm wondering whether this will make RayDP's case and other cases work: ref = ray.get(a.get.remote(), clone=True) Underline this get, we'll make the ref be the thing created by this task, so its life cycle is independent of the actor.

Since the value stored is immutable, whenever we need an updated value, we need to call actor's get again, which will generate a new ref.

Btw, actually doing ownership transfer is pretty hard, since the owner metadata is propagated inline with the object reference.

We don't need to take care of this, since ref is referencing the object owned by itself and all the other copied later on is about ref.

Please let me know whether it's the correct direction to go or not.

ericl commented 3 years ago

I'm wondering whether this will make RayDP's case and other cases work: ref = ray.get(a.get.remote(), clone=True) Underline this get, we'll make the ref be the thing created by this task, so its life cycle is independent of the actor.

Is this the same thing as ray.put(ray.get(a.get.remote())) -- making a copy? It could be another solution though the cost is we need to use twice the memory for a short period of time.

fishbone commented 3 years ago

I double-checked your sample code, and I realized that I misunderstood it. It's about return ObjectRef directly in the remote. I updated the API a little bit. I'm trying to avoid introducing the API to make the system like a global key-value store, but to make it more like a variable's ownership can be shared/moved.

>>> @ray.remote
... class A:
...    def __init__(self):
...       self.ref = ray.put("hi")
...    def get(self):
...       return self.ref
... 
>>> 
>>> a = A.remote()
>>> ref = ray.get(a.get.remote()).shared()
>>> ray.get(ref)

Here, if we give ObjectRef an API, maybe called shared, and the return, ref, is a local object id, and inside GCS, we have a link from ref -> loc.

And here, we need to update raylet to make an object shared by multiple tasks. I haven't checked how to do this yet, but I feel this makes the API looks cleaner, and the major updates should sit in raylet. As for inline value, since it's small and needs to be copied to remote tasks anyway, it should be OK to create a new object. If we do this:

shared_ref = ray.get(a.get.remote()).shared()
ray.get(shared_ref)

ref = ray.get(a.get.remote())
ray.get(ref)

There will be a duplicate; otherwise, there is no extra cost for memory. Correct me if I was wrong here.

So most of the updates should be in raylet. Here are some thoughts from me:

Maybe some step is hard to do, so please give me some comments.

fishbone commented 3 years ago

@ericl Here is my plan to support this

So the overall flow might look like

In this way, the objects' life-cycle stays the same as before. We also avoid duplicating low-level copy. And we give the fundamental support for ownership sharing. Maybe ownership sharing is enough for now. We can call delete in the first owner to support ownership transferring.

If you think it's good to go, I'll start implementing it.

ericl commented 3 years ago

Hmm this seems a little complicated, I'm not sure we should be changing plasma here or introducing entry sharing as a concept. How about we outline a couple different design options before coming to a conclusion?

I was originally thinking this would be a simple change, where the object is simply not deleted when the owner dies. If it turns out to be much harder then the feature might not be worth it.

Btw: there is a new ownership-based object directory that might conflict with this change: https://github.com/ray-project/ray/issues/12552

fishbone commented 3 years ago

@ericl Update plasma store to support obj sharing in low-level is not that complicated. We need to update ObjectTable with the value part to a raw pointer. And for the eviction, we need to manage it based on entry-level. Now it's ObjectId in the hash table with a linked list. We need to update its entry pointer in the hash table with a linked list. The overall update in this module is ok.

The sharing info tells the lower-level store that you don't need to create an entry, but instead, you can share it with the existing one. I haven't figured out how to ingest this sharing info in GCS or the new ownership-based object directory.

I was originally thinking this would be a simple change, where the object is simply not deleted when the owner dies. If it turns out to be much harder then the feature might not be worth it.

I thought about this before. But if this node(A) passes this obj_ref to another node(B), and B needs to download this info from the owner(X), which is dead. So B also needs this information that the obj_ref is owned by X but pinned in B. Without this information, B will think X is dead, and it'll throw errors as the current design. But with this info, it looks like supporting object sharing, which can be implemented better.

If we want to support pin an obj locally and the visibility is limited to the current node, it'll be a simpler change. We just don't delete the pinned obj even owner is dead. But if we'd like to implement ownership sharing, we need to make more changes.

ericl commented 3 years ago

I thought about this before. But if this node(A) passes this obj_ref to another node(B), and B needs to download this info from the owner(X), which is dead. So B also needs this information that the obj_ref is owned by X but pinned in B.

You can fall back to the GCS in this case right? The owner can respond that the object is owned by the GCS if it's up; if it's down the caller can auto-retry at the GCS just in case.

fishbone commented 3 years ago

@ericl Maybe I missed some parts in the system. I thought the owner needs to be up to access that object.

ericl commented 3 years ago

B wants to access ref, and it needs the owner to be up under the current design. It's fate-shared by the owner. X is a job doing some work, for example, accumulation, it finished the job and wants to pass the result to the driver. So X will not be up anymore after returning the results to the driver.

Right, but we can change the metadata resolution protocol to fall back to querying the GCS if X is down. So in this case, B can fallback to querying the GCS before giving up resolving the object metadata.

Btw, another related use case is that we might consider transferring object ownership to the GCS if it's ever spilled to external storage such as S3 (cc @rkooo567 @clarkzinzow ). This would allow the object to be retrieved even if the owner has been killed for some reason (might be useful for large scale distributed shuffle use cases). In this case, we would want to automatically transfer ownership to the GCS on spill without any API call from the user.

fishbone commented 3 years ago

@ericl

Right, but we can change the metadata resolution protocol to fall back to querying the GCS if X is down. So in this case, B can fallback to querying the GCS before giving up resolving the object metadata.

But even it falls back to querying the GCS, it still can't get the actual data since the owner is dead.

I feel that we are talking about different scenarios.

  1. One is to transfer/share the ownership from one node to another one. For example, a complicated actor finished a task and decided to pass the object created locally to the driver and it'll be terminated after. This is the case I was thinking about.
  2. Another one is to improve availability. We'd like to relax the restriction about fate-sharing. If a ref is used in some node and if in the cluster (or locally), it's available somewhere, we'll take that even the owner is down. We assume the owner will be up eventually. So in this case, the new API: ray.util.set_object_lifetime(obj_ref, lifetime="detached"|"lru"|<actorRef>) is a way to modify the behavior of eviction policy. In this case, we always query the local store first and the owner later.

Btw, another related use case is that we might consider transferring object ownership to the GCS if it's ever spilled to external storage such as S3 (cc @rkooo567 @clarkzinzow ). This would allow the object to be retrieved even if the owner has been killed for some reason (might be useful for large scale distributed shuffle use cases). In this case, we would want to automatically transfer ownership to the GCS on spill without any API call from the user.

Is this one also case 2?

ericl commented 3 years ago

But even it falls back to querying the GCS, it still can't get the actual data since the owner is dead.

If we transfer the metadata to the GCS this would fix that right? The protocol I'm imagining is:

Original owner:

  1. Transfer the metadata to the GCS
  2. Forward any future queries to the GCS

Resolver:

  1. First try the original owner
  2. If the owner says the object is transferred to the GCS, or the owner is down, retry at the GCS
  3. If the GCS says it doesn't know about the object, then an error is raised.
  4. Get the data (it should be available unless the node fails). We aren't worrying about the node failure case --- that could be handled with spilling.

One is to transfer/share the ownership from one node to another one. For example, a complicated actor finished a task and decided to pass the object created locally to the driver and it'll be terminated after. This is the case I was thinking about.

This seems like a more narrow case than general ownership transfer to GCS. Note that nodes don't own objects, worker processes within the node own objects. The problem here is not a node dying, it's the ability to kill a worker actor.

Another one is to improve availability. We'd like to relax the restriction about fate-sharing. If a ref is used in some node and if in the cluster (or locally), it's available somewhere, we'll take that even the owner is down. We assume the owner will be up eventually. So in this case, the new API: ray.util.set_object_lifetime(obj_ref, lifetime="detached"|"lru"|) is a way to modify the behavior of eviction policy. In this case, we always query the local store first and the owner later.

Hmm I don't quite get this one, once the owner is dead, it will never come back right?

Is this one also case 2?

I think it's neither; it's transferring ownership to the GCS.

fishbone commented 3 years ago

@ericl Thanks for the clarification. It looks like we are talking about the same problem. The only difference is about who should own that object. Let me summarize our ideas below, and please correct me if I was wrong.

First one:

Things need to update:

Second one:

Things need to update:

In my opinion about the pros and cons:

I feel the complexity of transferring ownership to GCS comes from object management, which looks different from before. Probably it's OK. The fallback can cause some performance issues, but it might be OK depending on the use case. The complexity of ownership sharing comes from the underline storage layer. For plasma store, it's ok from my understanding, not sure about spilling external storage.

ericl commented 3 years ago

Hmm I see, basically you're proposing a fast logical copy which generates a new object id. @stephanie-wang any thoughts on this vs ownership transfer?

We could potentially also take a copy approach to spilling.

Does it make sense to investigate the difficulty of these two approaches more first?

fishbone commented 3 years ago

@ericl You can view it as sharing (shard_ptr in cpp?). A new object id got generated by sharing the underline physical data. The new one behaves the same as the other object_ids. We create object_id by ray.put by now. And this one can create object_id from another object_id. The only difference will be how they are created, and the rest should be the same.

Does it make sense to investigate the difficulty of these two approaches more first?

Sure, I'll check the implementation of object life management for First one. I need to study that part anyway. Since this kind of thins should apply to spilled object too, I also need to check the complexity of updating that for Second one

rkooo567 commented 3 years ago

I like the @ericl's implementation (for simplicity). Transfer ownership to GCS just means GCS remembers which node the object is pinned right? It can be implemented in this way;

It seems like we can even guarantee the availability for disk external storage if we can force-spill replicas to multiple nodes (and ofc if it is distributed storage like S3, it is even easier).

rkooo567 commented 3 years ago

(Also, I believe we should refactor the plasma store to make it unit testable before we have any big changes)

fishbone commented 3 years ago

Thanks, @rkooo567, for the sharing your thoughts :)

Here, the complexity is about object-life-cycle management. In this way, we need to give GCS a way to manage this. After checking the code, I think it's not that easy, actually. Object lifecycle management is done in a worker not in GCS. So if we want transfer ownership to GCS, we need to abstract that layer out (otherwise, we need to maintain two logics for the same function) and make GCS able to manage that. This, from my opinion, makes GCS heavier and it doesn't looks like something should belong to GCS. With the copy way, each object_ref still has the owner, so it goes as the old way which fit the framework better from my understanding.

Update plasma store actually is not difficult here. It's about introducing a new API to link an object id to the value of an existing one. Besides, this module is simple and the scope is limited which makes it easier to be updated. More work is to add this meta in GCS and upperlayer's logic to fetch(if not exist)/link with new api(if existed). And when freed, underline physical value shouldn't be freed until all tasks which pin it free it.

And I checked local_store_manager where spilling is implemented. This logic looks like is hided from outside and something behind object_store. So here we only need to support this feature: free the value behind the object physically (in memory or external store). As for in memory object store, free this one if no one pins it. As for spilled store, we need to query GCS to get this.

@ericl My only concern is about objectlife management if ownership is transferred to GCS. The complexity of GCS will increase with the responsibility as the owner which belonged to task before.

ericl commented 3 years ago

Here's a thought, would transferring ownership be simpler if we only supported moving of the underlying data rather than a logical copy? (std::move vs std::shared_ptr).

That way we don't need to worry too much about multiple references to one buffer, we just transfer the data buffer to a new object id.

On Sun, Feb 7, 2021, 8:35 PM Yi Cheng notifications@github.com wrote:

Thanks, @rkooo567 https://github.com/rkooo567, for the sharing your thoughts :)

Here, the complexity is about object-life-cycle management. In this way, we need to give GCS a way to manage this. After checking the code, I think it's not that easy, actually. Object lifecycle management is done in a worker not in GCS. So if we want transfer ownership to GCS, we need to abstract that layer out (otherwise, we need to maintain two logics for the same function) and make GCS able to manage that. This, from my opinion, makes GCS heavier and it doesn't looks like something should belong to GCS. With the copy way, each object_ref still has the owner, so it goes as the old way which fit the framework better from my understanding.

Update plasma store actually is not difficult here. It's about introducing a new API to link an object id to the value of an existing one. Besides, this module is simple and the scope is limited which makes it easier to be updated. More work is to add this meta in GCS and upperlayer's logic to fetch(if not exist)/link with new api(if existed). And when freed, underline physical value shouldn't be freed until all tasks which pin it free it.

And I checked local_store_manager where spilling is implemented. This logic looks like is hided from outside and something behind object_store. So here we only need to support this feature: free the value behind the object physically (in memory or external store). As for in memory object store, free this one if no one pins it. As for spilled store, we need to query GCS to get this.

@ericl https://github.com/ericl My only concern is about objectlife management if ownership is transferred to GCS. The complexity of GCS will increase with the responsibility as the owner which belonged to task before.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/12635#issuecomment-774859329, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSTMBWWHKWNHFZVOUSLS55SX5ANCNFSM4UOG4ZQQ .

rkooo567 commented 3 years ago

I fee like the conversation got too long, and it is hard to track now. Why don’t we make a doc that can compare both implementation? (Like goals of the api, and how each api solves the problem, impl details and complexity, pros and cons, and etc.) btw, plz feel free to keep conversation with Eric about the second proposal.

On Sun, Feb 7, 2021 at 8:38 PM Eric Liang notifications@github.com wrote:

Here's a thought, would transferring ownership be simpler if we only supported moving of the underlying data rather than a logical copy? (std::move vs std::shared_ptr).

That way we don't need to worry too much about multiple references to one buffer, we just transfer the data buffer to a new object id.

On Sun, Feb 7, 2021, 8:35 PM Yi Cheng notifications@github.com wrote:

Thanks, @rkooo567 https://github.com/rkooo567, for the sharing your thoughts :)

Here, the complexity is about object-life-cycle management. In this way, we need to give GCS a way to manage this. After checking the code, I think it's not that easy, actually. Object lifecycle management is done in a worker not in GCS. So if we want transfer ownership to GCS, we need to abstract that layer out (otherwise, we need to maintain two logics for the same function) and make GCS able to manage that. This, from my opinion, makes GCS heavier and it doesn't looks like something should belong to GCS. With the copy way, each object_ref still has the owner, so it goes as the old way which fit the framework better from my understanding.

Update plasma store actually is not difficult here. It's about introducing a new API to link an object id to the value of an existing one. Besides, this module is simple and the scope is limited which makes it easier to be updated. More work is to add this meta in GCS and upperlayer's logic to fetch(if not exist)/link with new api(if existed). And when freed, underline physical value shouldn't be freed until all tasks which pin it free it.

And I checked local_store_manager where spilling is implemented. This logic looks like is hided from outside and something behind object_store. So here we only need to support this feature: free the value behind the object physically (in memory or external store). As for in memory object store, free this one if no one pins it. As for spilled store, we need to query GCS to get this.

@ericl https://github.com/ericl My only concern is about objectlife management if ownership is transferred to GCS. The complexity of GCS will increase with the responsibility as the owner which belonged to task before.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <https://github.com/ray-project/ray/issues/12635#issuecomment-774859329 , or unsubscribe < https://github.com/notifications/unsubscribe-auth/AAADUSTMBWWHKWNHFZVOUSLS55SX5ANCNFSM4UOG4ZQQ

.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/12635#issuecomment-774860238, or unsubscribe https://github.com/notifications/unsubscribe-auth/AENHHICVXSXWVEGIOTMKFXDS55TGFANCNFSM4UOG4ZQQ .

fishbone commented 3 years ago

@rkooo567 That's a good suggestion. Usually, I'd prefer a meeting which is more efficient actually. I'll create a doc with a comparison of solutions here.

@ericl That's one way to do this, which simplifies the lower-level module, but it increases the complexity at a higher level.

First of all, move has to happen at the owner's side. Otherwise, the owner has the possibility of losing the object at any time (if we are OK with that, it might also be OK???). We might introduce something like this:

>>> @ray.remote
... class A:
...    def __init__(self):
...       self.ref = ray.put("hi")
...    def get(self, move=False):
...       if move:
...           return ray.move(self.ref)
...       return self.ref
... 
>>> 
>>> a = A.remote()
>>> ref1 = A.get(False)
>>> ref2 = A.get(True)
>>> ray.get(ref2) # ok
>>> ray.get(ref1) # error

First of all, after callingA.get(true), in the actor, self.ref will be invalid anymore, and all the other tasks, including the original owner itself, will not be able to access ref1. This gives the burden to the user.

To resolve this, 1) we need to write the mapping into GCS, and the resolver needs to resolve ref1 to ref2. We need this resolving for all refs since we are not sure whether it got moved or not, which might decrease the performance. For sharing case, only when deleting an object or the object doesn't exist locally, we need query GCS. 2) We need to update the object store to support move. 4) Reference counter meta needs to be transferred too. 5) Now, the borrower needs to notify the new owner. Since we don't know when will 4) finish and 5) happen, ordering needs to be maintained.

ericl commented 3 years ago

Hmm in many cases it's probably totally fine to "give the burden to the user", since ownership transfer should only be needed in specific rare situations the user is in full control over.

If we assume it's ok to totally invalidate ref1 after the move, then we should be able to support this:

ref2 = ray.experimental.transfer_ownership_to_self(ref1)
ray.get(ref1)  # object not found (as if it was evicted)
ray.get(ref2)  # normal object owned by the current process
stephanie-wang commented 3 years ago

I haven't read the thread fully, so sorry if this is already mentioned, but one idea is to have an API to "extract" an object. It would work well for objects that have already been spilled, since those objects already have a URL (filesystem or S3 path) that can be used to get the object directly. For objects that haven't already been spilled, we could either provide the node ID or force-spill the object.

We would just need to make sure not to delete those objects through the normal garbage collection scheme.

ericl commented 3 years ago

Extraction sounds pretty good for spilled objects, though iiuc this is an orthogonal use case from the ownership transfer of plasma objects right?

On Mon, Feb 8, 2021, 10:06 AM Stephanie Wang notifications@github.com wrote:

I haven't read the thread fully, so sorry if this is already mentioned, but one idea is to have an API to "extract" an object. It would work well for objects that have already been spilled, since those objects already have a URL (filesystem or S3 path) that can be used to get the object directly. For objects that haven't already been spilled, we could either provide the node ID or force-spill the object.

We would just need to make sure not to delete those objects through the normal garbage collection scheme.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/12635#issuecomment-775336204, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSWHVED744BEPUO5AIDS6AR2DANCNFSM4UOG4ZQQ .

stephanie-wang commented 3 years ago

Extraction sounds pretty good for spilled objects, though iiuc this is an orthogonal use case from the ownership transfer of plasma objects right?

Yes, it would be for if you want to use an object from outside a Ray cluster completely, versus wanting to use it from another Ray job. A good example is if you want to checkpoint some final output.

fishbone commented 3 years ago

Sorry for the late reply. What a day.

@stephanie-wang Thanks for the comments. Extraction sounds supporting the semantic of sharing, which is one way we can actually go. In this case, I'll prefer supporting sharing semantic for non-spilled objects in some way too.

@ericl If we don't care about this, what you proposed sounds good to me. Most work looks like it is in raylet and GCS. In this way, we support transferring. I think we can go with this way first, and if there is a need for sharing semantic, we can plan that later. I'll write a doc summarizing what was discussed here, which might be useful in the future. And in the meantime, I'll start to check how to implement it. I'll come back if I found some other issues.

fschlimb commented 3 years ago

A very interesting but long discussion, please be lenient with me if I missed something.

What I do not understand is how it is supposed to be used. Even if I have a named actor, how would I know what objref I want to transfer to another actor/process? As far as I can tell, this would mean that each such named actor would have to explicitly implement and expose some kind of an lookup mechanism (e.g. a key-value store) since different borrowers might want to access the same data. This looks such a thing would need reimplementation frequently while it could be provide as a generic feature.

Put differently, without such a lookup mechanism, objrefs can only be known/accessed if the borrowing task and the generating/owning task have a common ancestor in the task-graph. This is pretty limiting - in particular if talking about sharing data between jobs.

Proper data distribution is the key to efficiency in a distributed environment. Index- or tag-spaces are a common and effective approach to control data distribution. From a user perspective, having a mechanism to define and use a global tag/index space (and not thinking about ownership) would be a powerful feature and make sharing data across jobs much easier. Of course this will also require a proper handling of the GC issue...

I think something along the lines of a key-value approach as indicated in the initial post by @ericl would be most useful. I understand @iycheng 's concern about GC in such a kv-store. Similar to the thoughts about user-burden with handling ownership, I am convinced this can be addressed by requiring some user-guidance for GC. It would still be a safe, useful, usable and powerful feature.

fishbone commented 3 years ago

@fschlimb first of all, thanks for the sharing.

Sorry for the confusion, but the reason I mentioned not a kv-store is just about the API. The functionality is actually the same as before. The information passed between is still the same. For example, either way, we still need to put an object to the store to get an object_ref, and we still need the object_ref to be passed to other jobs to be accessed. This feature provides a way to transfer/share the ownership between tasks instead of supporting object sharing between jobs, so I thought it should be good to make it not like a kv-store.

From my understanding, ownership sharing and transferring are like cpp's smart pointer's ownership sharing and transferring. While sharing objects between is like using external storage to share data across. In the first case, lifecycle management should be covered by the framework, and we need to ensure data will be freed if the driver ended. In the latter case, usually, it's the user's responsibility to delete the data. We also need to introduce the namespace and naming concept; otherwise, it'll be super difficult to use. It should have some use cases and also more difficult to support. Considering these are about different use cases, we can start a new thread discussing that.

I'm sorry that the discussion is getting too long. :( I'll write a document recently summarizing everything here and the final decision we'll plan to go and some examples about how to use this. I'll ping you when it's ready.

fschlimb commented 3 years ago

Thanks @iycheng for carving this out.

I started a discussion here: https://discuss.ray.io/t/creating-objref-from-manually-created-object-id/785/2. Is this the right forum or should I also open an issue here on github?

Is @ericl's example with the actors taking the data with them when dying the motivating example for ownership transferal? Or are there other examples? I ask because I still can't see the problem it would be solving.

Actually, I think the current behavior (like in @ericl's example) is at least unintuitive. The natural expectation is that an object will never disappear as long as an ObjRef to it exists. I'd prefer being asked to actively address the actual problem (GC) rather than unpredictably getting stale references. The user should not have to deal with artefacts from the inner workings of the runtime.

Also, transferring ownership does not seem to compose well. Let's assume different packages/modules compose by exchanging ObjRefs. Projects built of such modules will force everyone trying to get ownership to every ObjRef since how would a module know one needs to get ownership or not?

Am I missing something?

ericl commented 3 years ago

I agree it would be best to transparently handle this case. We should at least take a look at the options there to see if there is a feasible solution.

On Wed, Feb 10, 2021, 3:56 AM Frank Schlimbach notifications@github.com wrote:

Thanks @iycheng https://github.com/iycheng for carving this out.

I started a discussion here: https://discuss.ray.io/t/creating-objref-from-manually-created-object-id/785/2. Is this the right forum or should I also open an issue here on github?

Is @ericl https://github.com/ericl's example with the actors taking the data with them when dying the motivating example for ownership transferal? Or are there other examples? I ask because I still can't see the problem it would be solving.

Actually, I think the current behavior (like in @ericl https://github.com/ericl's example) is at least unintuitive. The natural expectation is that an object will never disappear as long as an ObjRef to it exists. I'd prefer being asked to actively address the actual problem (GC) rather than unpredictably getting stale references. The user should not have to deal with artefacts from the inner workings of the runtime.

Also, transferring ownership does not seem to compose well. Let's assume different packages/modules compose by exchanging ObjRefs. Projects built of such modules will force everyone trying to get ownership to every ObjRef since how would a module know one needs to get ownership or not?

Am I missing something?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/12635#issuecomment-776657073, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSTNDPHCQJWZXSJQR4DS6JX5HANCNFSM4UOG4ZQQ .

fishbone commented 3 years ago

For anyone who might be lost in this long discussion, I have a summary for this thread. Please leave a comment there if you have any questions/suggestions. cc @ericl @stephanie-wang @fschlimb @rkooo567 @DonYum

fschlimb commented 3 years ago

@iycheng Shouldn't a fourth solution be considered which simply treats an objref like a shared_ptr? E.g. nothing needed from the user, as long as one ObjRef exists the object will be available. This will probably imply moving objects to the GCS if the owner dies and additional effort to get the ref-counting right. As mentioned before, this seems to be the only fully composable option.

fishbone commented 3 years ago

@fschlimb The second one is almost similar to the one you mentioned. If we make it default for all variables which introduces extra performance cost. So at someplace, we should have some kind of call to tell the system, it's shared_ptr, not unique_ptr. In the second solution, it's like the caller will do this. Or we make callee do this kind of work, like return an ObjectRef and user set the 'shared bit' set to true and in caller side, it'll just do this work automatically.

From the API we can do this one easily if we support the second proposal. One is explicitly shared and the other one is default shared.

fschlimb commented 3 years ago

@iycheng Yes, I agree, these are closely related.

The point I am trying to make is that I do not see how 2 (or 1 and 3) is (are) composable. Without shared being the default, how would different packages orchestrated to a larger program synchronize? I see only 2 practical options: 1. all producers explicitly declare all outgoing refs shared or 2. the consumers always convert all incoming objrefs to shared. Both are pretty fragile/error prone and cumbersome.

fishbone commented 3 years ago

@fschlimb I'm not an expert in ray's using patterns, so correct me if I'm wrong. From my understanding, the actor will be killed by the user ( not sure :p ), and the user knows which object will be used after killing. So, in this case, the user only needs to make that particular object shared.

One difference between (2) and (4) is that (2) always generates new object id while (4) reuse the old one. The first one make it fit into the current framework easily, and we'll only have overhead for the necessary one.

Default is definitely one way to do this at the API level, but we need to revisit the reference counting algorithm and memory management. It'll be pinned at the node for shared objects to avoid eviction. If we make it default by all, it'll waste a lot of memory.

guykhazma commented 3 years ago

Hi,

is this issue still active? was there any progress?

Thanks

kira-lin commented 3 years ago

Hi all, First, thank you all for working on this issue! I am from the raydp team. This feature is very important in our project, so we want to settle on a solution and implement it.

I have read the proposals. @ericl @iycheng Is there any progress since then? All of them work for us, but some are harder to implement than others. So far the proposals assume objects are already in the plasma, but is it possible for a worker to put an object into plasma on behalf of another worker? In other words, we can add an API like ray.put(obj, owner=another_worker). Here another_worker could be a global named worker. Changing the ownership info in the first place might save us from having inconsistent copies. I'm not an expert of ray core, so please correct me if I'm wrong.

Thanks

ericl commented 3 years ago

We have a public design doc here on the future approach, which is in progress: https://docs.google.com/document/d/1Immr9m049Ikj8Pq3G06Csm8sHyhBvzpWENGB5lHPbOY/edit?usp=drivesdk

On Sun, May 30, 2021, 11:41 PM Zhi Lin @.***> wrote:

Hi all, First, thank you all for working on this issue! I am from the raydp https://github.com/oap-project/raydp team. This feature is very important in our project, so we want to settle on a solution and implement it.

I have read the proposals. @ericl https://github.com/ericl @iycheng https://github.com/iycheng Is there any progress since then? All of them work for us, but some are harder to implement than others. So far the proposals assume objects are already in the plasma, but is it possible for a worker to put an object into plasma on behalf of another worker? In other words, we can add an API like ray.put(obj, owner=another_worker). Here another_worker could be a global named worker. Changing the ownership info in the first place might save us from having inconsistent copies. I'm not an expert of ray core, so please correct me if I'm wrong.

Thanks

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/12635#issuecomment-851232056, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSQYMOHA3Q6TBMHXX4TTQMVSJANCNFSM4UOG4ZQQ .

ericl commented 2 years ago

I've updated the description given the current state of things. The proposal is to extend the ray.put with owner API to allow GCS owned objects. In the future, we can then enable automatic transfer to GCS.