ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
31.94k stars 5.44k forks source link

[core][experimental] shared memory channel can't finish begin_read #45855

Open kevin85421 opened 3 weeks ago

kevin85421 commented 3 weeks ago

What happened + What you expected to happen

import ray
import ray.experimental.channel as ray_channel

@ray.remote(num_cpus=1)
class Actor:
    def __init__(self):
        pass

    def pass_channel(self, channel):
        self._chan = channel

    def create_channel(self, writer, readers):
        self._chan = ray_channel.Channel(writer, readers)
        return self._chan

    def read(self):
        return self._chan.begin_read()

    def end_read(self):
        self._chan.end_read()

    def write(self, value):
        self._chan.write(value)

actor1 = Actor.remote()
actor2 = Actor.remote()

# actor1 writes data to Channel and two Ray tasks on actor2 read it.
actor1_output_channel = ray.get(
    actor1.create_channel.remote(actor1, [actor2, actor2])
)
ray.get(actor2.pass_channel.remote(actor1_output_channel))
ray.get(actor1.write.remote("world hello"))
assert ray.get(actor2.read.remote()) == "world hello"
ray.get(actor2.end_read.remote())
assert ray.get(actor2.read.remote()) == "world hello"
ray.get(actor2.end_read.remote())

Versions / Dependencies

nightly

Reproduction script

See the above section.

Issue Severity

None

kevin85421 commented 3 weeks ago

cc @jackhumphries

jackhumphries commented 3 weeks ago

This is the intended behavior. There is one MutableObjectManager instance per actor, and this manager expects that an actor will only read a mutable object once. Thus, the actor is blocked on the next read until a write occurs.