ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.03k stars 5.59k forks source link

[<Ray component: RLlib] enable_env_runner_and_connector_v2 NotImplementedError #45464

Open timborden opened 3 months ago

timborden commented 3 months ago

What happened + What you expected to happen

Using the new V2 API stack raises a NotImplementedError in BatchIndividualItems(ConnectorV2)

Versions / Dependencies

Google Colab, Python 3.10, Ray 2.22.0

Reproduction script

num_agents = 20
num_steps = 1024

algo = PPOConfig().framework("tf2").api_stack(
  enable_rl_module_and_learner = True,
  enable_env_runner_and_connector_v2 = True
).environment(
  CustomMultiAgentEnv,
  env_config = {
    "num_agents": num_agents,
    "num_steps": num_steps
  }
).multi_agent(
  policies = ["policy"],
  policy_mapping_fn = lambda *args, **kwargs: "my_policy",
  count_steps_by = "agent_steps"
).env_runners(
  num_env_runners = 0,
  sample_timeout_s = 60.0 * (num_steps / 4000),
  rollout_fragment_length = "auto"
).training(
  train_batch_size = int(num_steps * num_agents),
  sgd_minibatch_size = int(num_steps * num_agents / 4)
).checkpointing(
  export_native_model_files = True
).build().train()

Issue Severity

High: It blocks me from completing my task.

simonsays1980 commented 3 months ago

@timborden Thanks for filing this. Do you have a stack trace or a reproducable example?

timborden commented 3 months ago

@simonsays1980 Thanks for picking it up and taking a look....here's the stack trace:

2024-05-21 12:09:50,035 WARNING deprecation.py:50 -- DeprecationWarning: `WorkerSet(num_workers=... OR local_worker=...)` has been deprecated. Use `EnvRunnerGroup(num_env_runners=... AND local_env_runner=...)` instead. This will raise an error in the future!
2024-05-21 12:09:50,038 WARNING deprecation.py:50 -- DeprecationWarning: `max_num_worker_restarts` has been deprecated. Use `AlgorithmConfig.max_num_env_runner_restarts` instead. This will raise an error in the future!
2024-05-21 12:09:51,231 WARNING util.py:61 -- Install gputil for GPU system monitoring.
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-8-7f81266a3521> in <cell line: 1>()
----> 1 result = algo.build().train()
      2 print(tune.logger.pretty_print(result))

/usr/local/lib/python3.10/dist-packages/ray/tune/trainable/trainable.py in train(self)
    329         except Exception as e:
    330             skipped = skip_exceptions(e)
--> 331             raise skipped from exception_cause(skipped)
    332 
    333         assert isinstance(result, dict), "step() needs to return a dict."

/usr/local/lib/python3.10/dist-packages/ray/tune/trainable/trainable.py in train(self)
    326         start = time.time()
    327         try:
--> 328             result = self.step()
    329         except Exception as e:
    330             skipped = skip_exceptions(e)

/usr/local/lib/python3.10/dist-packages/ray/rllib/algorithms/algorithm.py in step(self)
    871         #   evaluate after the training iteration is entirely done.
    872         else:
--> 873             train_results, train_iter_ctx = self._run_one_training_iteration()
    874 
    875         # Sequential: Train (already done above), then evaluate.

/usr/local/lib/python3.10/dist-packages/ray/rllib/algorithms/algorithm.py in _run_one_training_iteration(self)
   3154                     # Try to train one step.
   3155                     with self._timers[TRAINING_STEP_TIMER]:
-> 3156                         results = self.training_step()
   3157 
   3158         return results, train_iter_ctx

/usr/local/lib/python3.10/dist-packages/ray/rllib/algorithms/ppo/ppo.py in training_step(self)
    422         # New API stack (RLModule, Learner, EnvRunner, ConnectorV2).
    423         if self.config.enable_env_runner_and_connector_v2:
--> 424             return self._training_step_new_api_stack()
    425         # Old and hybrid API stacks (Policy, RolloutWorker, Connector, maybe RLModule,
    426         # maybe Learner).

/usr/local/lib/python3.10/dist-packages/ray/rllib/algorithms/ppo/ppo.py in _training_step_new_api_stack(self)
    433             # Sample in parallel from the workers.
    434             if self.config.count_steps_by == "agent_steps":
--> 435                 episodes, env_runner_results = synchronous_parallel_sample(
    436                     worker_set=self.workers,
    437                     max_agent_steps=self.config.total_train_batch_size,

/usr/local/lib/python3.10/dist-packages/ray/rllib/execution/rollout_ops.py in synchronous_parallel_sample(worker_set, max_agent_steps, max_env_steps, concat, sample_timeout_s, _uses_new_env_runners, _return_metrics)
     90         # samples.
     91         if worker_set.num_remote_workers() <= 0:
---> 92             sampled_data = [worker_set.local_worker().sample()]
     93             if _return_metrics:
     94                 stats_dicts = [worker_set.local_worker().get_metrics()]

/usr/local/lib/python3.10/dist-packages/ray/rllib/env/multi_agent_env_runner.py in sample(self, num_timesteps, num_episodes, explore, random_actions, force_reset)
    147         # Sample n timesteps.
    148         if num_timesteps is not None:
--> 149             samples = self._sample_timesteps(
    150                 num_timesteps=num_timesteps,
    151                 explore=explore,

/usr/local/lib/python3.10/dist-packages/ray/rllib/env/multi_agent_env_runner.py in _sample_timesteps(self, num_timesteps, explore, random_actions, force_reset)
    249             else:
    250                 # Env-to-module connector.
--> 251                 to_module = self._cached_to_module or self._env_to_module(
    252                     rl_module=self.module,
    253                     episodes=[self._episode],

/usr/local/lib/python3.10/dist-packages/ray/rllib/connectors/env_to_module/env_to_module_pipeline.py in __call__(self, rl_module, data, episodes, explore, shared_data, **kwargs)
     23         # Make sure user does not necessarily send initial input into this pipeline.
     24         # Might just be empty and to be populated from `episodes`.
---> 25         return super().__call__(
     26             rl_module=rl_module,
     27             data=data if data is not None else {},

/usr/local/lib/python3.10/dist-packages/ray/rllib/connectors/connector_pipeline_v2.py in __call__(self, rl_module, data, episodes, explore, shared_data, **kwargs)
     66             timer = self.timers[str(connector)]
     67             with timer:
---> 68                 data = connector(
     69                     rl_module=rl_module,
     70                     data=data,

/usr/local/lib/python3.10/dist-packages/ray/rllib/connectors/common/batch_individual_items.py in __call__(self, rl_module, data, episodes, explore, shared_data, **kwargs)
     87             # [module_id] -> [col0] -> [list of items]
     88             else:
---> 89                 raise NotImplementedError
     90 
     91         return data

NotImplementedError: