ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.95k stars 5.58k forks source link

[RLlib] Unable to call ray.remote functions inside env/action dist #26468

Open HJasperson opened 2 years ago

HJasperson commented 2 years ago

What happened + What you expected to happen

I have a multi-agent environment where the reward is calculated per agent. The reward function is fairly intensive so I'd like to run it in parallel for the various agents. Whenever I try something like below, the script hangs at the first call to the remote function.

I've also tried something similar inside a custom action distribution with the same result. Moving the slow function out of the class has no impact.

Versions / Dependencies

python 3.9.12 ray 1.12.0 rhel 7.9

Reproduction script

import gym
import numpy as np
import time
import ray
from ray.rllib.env.multi_agent_env import MultiAgentEnv

class customEnv(MultiAgentEnv):
    def __init__(self, num, thresh):
        super().__init__()
        self.numAgents = num
        self.thresh = thresh
        self.someProperty = np.array([])
        self._agent_ids = set(range(num))
        self.observation_space = gym.spaces.Discrete(2)
        self.action_space = gym.spaces.Discrete(2)

    def reset(self):
        self.someProperty = np.zeros(self.numAgents)
        obs = {}
        for i in range(0, self.numAgents):
            obs[str(i)] = self.someProperty[i]
        return obs

    def step(self, action_dict):
        action = np.array(list(action_dict.values()))
        self.someProperty = self.someProperty + action

        ################## remote call ##################
        allRewards = ray.get([self.slowFunction.remote(self.someProperty[i]) for i in range(0,self.numAgents)])
        #################################################

        obs, rew, done, info = {}, {}, {}, {}
        allDone = True
        for i in range(0, self.numAgents):
            obs[str(i)] = self.someProperty[i]
            rew[str(i)] = allRewards[i]
            if self.someProperty[i] > self.thresh:
                done[str(i)] = True
            else:
                done[str(i)] = False
                allDone = False
        done["__all__"] = allDone
        return obs, rew, done, info

    ################## remote function ##################
    @ray.remote
    def slowFunction(self, x):
        time.sleep(500)
        return x

Issue Severity

Medium: It is a significant difficulty but I can work around it.

sven1977 commented 2 years ago

Hey @HJasperson , thanks for filing this issue. Your code looks totally fine and I wouldn't know of any reason, why this shouldn't work. I have a few questions, though. I would expect every single ray.get() call in your step method to hang for roughly 500s as all parallelized tasks will sleep for exactly 500s and only then return. The ray.get call is blocking, so the environment will NOT continue until all slowFunctions have returned their results.

sven1977 commented 2 years ago

There are a couple more flaw in your repro. Please make sure to test your reproduction scripts to show the reported error, before opening the issue.

HJasperson commented 2 years ago

the environment will NOT continue until all slowFunctions have returned their results

The problem is that it NEVER continues. I know how long this method should take and the environment hangs even after an excessive amount of time has passed.

I'm also pretty sure leaving this as a method (instead of an external function) will serialize the entire env

That was my thought as well, but calling an external method gives the same result (i.e. env hangs).

The script is just to illustrate what I was talking about. I am unwilling to share my actual code with you until it is published.