google-deepmind / acme

A library of reinforcement learning components and agents
Apache License 2.0
3.47k stars 426 forks source link

Errors while running DistributedD4PG #267

Closed kmukeshreddy closed 1 month ago

kmukeshreddy commented 2 years ago

Hello, I am passing a custom gym environment to the DistributedD4PG.

Sample code:

            distributed_agent = DistributedD4PG(environment=train_environment, 
                        networks_dict=agent_networks,
                        agent=agent_d3pg,
                        obj_func = obj_func,
                        critic_loss_type=critic,
                        threshold=threshold,
                        accelerator="GPU",
                        num_actors=2,
                        num_caches=0,
                        environment_spec=environment_spec,
                        batch_size=agent.batch_size,
                        n_step=agent.n_step,
                        sigma=agent.sigma,
                        discount=agent.discount,
                        policy_optimizer=snt.optimizers.Adam(agent.lr),
                        critic_optimizer=snt.optimizers.Adam(agent.lr),
                        max_actor_steps=init_ttm*frq*train_sim,
                        log_every= 1.0,
                        )
            try:
                program = distributed_agent.build()
                logger.info(f"{constants.STATUS} Distributed Program Build is successful!")
                control = lp.launch(program, launch_type=lp.LaunchType.LOCAL_MULTI_THREADING, terminal='current_terminal')
                control.wait()

The Build is successful with no errors while running the program by lp.launch it's throws an error. I made sure that there were no bugs in the agent_distributed.py as we have a custom gym env.

Error:

[2022-09-13 18:14:50,172][root][INFO] - [Actor] Actor Episodes = 1 | Actor Steps = 2 | Episode Length = 2 | Episode Return = -2.2366318702697754 | Steps Per Second = 1.082
Node ThreadWorker(thread=<Thread(evaluator, stopped daemon 22920789145344)>, future=<Future at 0x14df6b38dac0 state=finished raised UFuncTypeError>) crashed:
Traceback (most recent call last):
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 461, in _check_workers
    worker.future.result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 438, in result
    return self.__get_result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 238, in run_inner
    future.set_result(f())
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/python/node.py", line 75, in _construct_function
    return functools.partial(self._function, *args, **kwargs)()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/courier/node.py", line 130, in run
    instance.run()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 176, in run
    result = self.run_episode()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 125, in run_episode
    episode_return = tree.map_structure(operator.iadd,
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in map_structure
    [func(*args) for args in zip(*map(flatten, structures))])
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in <listcomp>
    [func(*args) for args in zip(*map(flatten, structures))])
numpy.core._exceptions._UFuncOutputCastingError: Cannot cast ufunc 'add' output from dtype('O') to dtype('float32') with casting rule 'same_kind'

Questions:

  1. Could you please let me know how to resolve it?
  2. Is there any other way to run DistributedD4PG without lp.launch?

Additional details:

  1. The run was successful when using D4PG as mentioned here: https://github.com/deepmind/acme/blob/master/acme/agents/tf/d4pg/agent_test.py. But throwing error for DistributedD4PG.
  2. Please let me know if any additional details are required, regarding the underlying code or something else.

Thank you!

kmukeshreddy commented 2 years ago

@qstanczyk or @ethanluoyc ,Maybe you could please provide some insight.

kmukeshreddy commented 2 years ago

To Add more: There is also another error that I am encountering.

[2022-09-13 18:52:21,276][root][INFO] - [Evaluator] Episode Length = 44 | Episode Return = -26.131765365600586 | Evaluator Episodes = 1 | Evaluator Steps = 44 | Steps Per Second = 17.810
Node ThreadWorker(thread=<Thread(actor, stopped daemon 22609301317376)>, future=<Future at 0x149071f16e80 state=finished raised ValueError>) crashed:
Traceback (most recent call last):
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 461, in _check_workers
    worker.future.result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 438, in result
    return self.__get_result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 238, in run_inner
    future.set_result(f())
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/python/node.py", line 75, in _construct_function
    return functools.partial(self._function, *args, **kwargs)()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/courier/node.py", line 130, in run
    instance.run()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 176, in run
    result = self.run_episode()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 109, in run_episode
    self._actor.observe(action, next_timestep=timestep)
  File "/home/mukeshrk/AutoHedges/autohedge/solver/rl/common/actors.py", line 86, in observe
    self._adder.add(action, next_timestep)
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/transition.py", line 133, in add
    super().add(*args, **kwargs)
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/base.py", line 207, in add
    self._write()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/transition.py", line 169, in _write
    reward, discount = tree.map_structure(
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in map_structure
    [func(*args) for args in zip(*map(flatten, structures))])
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in <listcomp>
    [func(*args) for args in zip(*map(flatten, structures))])
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/transition.py", line 151, in <lambda>
    get_all_np = lambda x: x[self._first_idx:self._last_idx].numpy()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/reverb/trajectory_writer.py", line 595, in __getitem__
    return TrajectoryColumn(self._slice(val), path=path)
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/reverb/trajectory_writer.py", line 620, in __init__
    raise ValueError('TrajectoryColumns cannot contain any None data '
ValueError: TrajectoryColumns cannot contain any None data references, got [<reverb.libpybind.WeakCellRef object at 0x148ea8de4a70>, <reverb.libpybind.WeakCellRef object at 0x1490280553b0>, <reverb.libpybind.WeakCellRef object at 0x148f900fb270>, <reverb.libpybind.WeakCellRef object at 0x148f901a2cb0>, None] for TrajectoryColumn at path ('reward', slice(2, 7, None))
[2022-09-13 18:52:21,435][absl][INFO] - Caught SIGTERM: forcing a checkpoint save.
[2022-09-13 18:52:21,436][absl][INFO] - Saving checkpoint: /home/mukeshrk/acme/24e92b48-3395-11ed-b8ae-ac1f6b24e18a/checkpoints/counter
Error Description: TrajectoryColumns cannot contain any None data references, got [<reverb.libpybind.WeakCellRef object at 0x150ba00a7530>, <reverb.libpybind.WeakCellRef object at 0x150ba01637b0>, <reverb.libpybind.WeakCellRef object at 0x150ba01669f0>, <reverb.libpybind.WeakCellRef object at 0x150ba0169030>, None] for TrajectoryColumn at path ('reward', slice(35, 40, None))
qstanczyk commented 2 years ago

Are you able to reproduce those issues with the default environments so that we can debug it?

kmukeshreddy commented 2 years ago

Hi @qstanczyk , Thank you for your reply.

I am still getting the similar error as before while using fake.environment as mentioned here: https://github.com/deepmind/acme/blob/master/acme/agents/tf/d4pg/agent_distributed_test.py

Implementation: Note: I have modified the agent_distributed.py file as per my custom environment requirement. (Please let me know if you want to walk through it?)

            from autohedge.solver.rl.d3pg import agent as agent_d3pg
            import helpers
            from acme import specs
            from acme.testing import fakes
            # print("="*20)
            # print(agent_networks)
            # print("="*20)

            make_environment = lambda x: fakes.ContinuousEnvironment(bounded=True)
            environment = make_environment(False) #similar to what happening inside acme/agent_distributed.py
            _environment_spec = specs.make_environment_spec(environment)

            rl_agent, agent_networks, agent_name = create_agent_and_networks(environment_spec=_environment_spec,
                                                                    logger=make_logger(work_folder, constants.LEARNER),
                                                                    threshold=threshold, critic=critic, obj_func=obj_func, **agent)

            distributed_agent = DistributedD4PG(environment=environment, 
                        networks_dict=agent_networks,
                        agent=agent_d3pg,
                        obj_func = obj_func,
                        critic_loss_type=critic,
                        threshold=threshold,
                        accelerator="GPU",
                        num_actors=2,
                        num_caches=0,
                        environment_spec=_environment_spec,
                        batch_size=agent.batch_size,
                        n_step=agent.n_step,
                        sigma=agent.sigma,
                        discount=agent.discount,
                        policy_optimizer=snt.optimizers.Adam(agent.lr),
                        critic_optimizer=snt.optimizers.Adam(agent.lr),
                        max_actor_steps=init_ttm*frq*train_sim,
                        log_every= 1.0,
                        )
            try:
                program = distributed_agent.build()
                control = lp.launch(program, launch_type=lp.LaunchType.LOCAL_MULTI_THREADING, terminal='current_terminal')
                control.wait()

Error:

[2022-09-14 14:47:35,882][root][INFO] - [Evaluator] Actor Episodes = 1 | Actor Steps = 2 | Episode Length = 43 | Episode Return = 0.0 | Evaluator Episodes = 1 | Evaluator Steps = 43 | Steps Per Second = 17.969
====================

========== ERROR ==========

Error type: ValueError
Error Description: TrajectoryColumns cannot contain any None data references, got [<reverb.libpybind.WeakCellRef object at 0x154828261030>, <reverb.libpybind.WeakCellRef object at 0x1547700ba030>, <reverb.libpybind.WeakCellRef object at 0x15477007b7f0>, None] for TrajectoryColumn at path ('reward', slice(0, 4, None))
Error line number: 267
Error file: /home/mukeshrk/AutoHedges/experiments/scripts/run_distributed_delta_agent.py

====================
Error executing job with overrides: ['+agent.agent=d3pg', '+spread=0.01', '+logger_prefix=test-d4pg']
Traceback (most recent call last):.
  File "/home/mukeshrk/AutoHedges/experiments/scripts/run_distributed_delta_agent.py", line 267, in run
    control.wait()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 415, in wait
    raise failure
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 461, in _check_workers
    worker.future.result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 438, in result
    return self.__get_result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 238, in run_inner
    future.set_result(f())
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/python/node.py", line 75, in _construct_function
    return functools.partial(self._function, *args, **kwargs)()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/courier/node.py", line 130, in run
    instance.run()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 176, in run
    result = self.run_episode()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 109, in run_episode
    self._actor.observe(action, next_timestep=timestep)
  File "/home/mukeshrk/AutoHedges/autohedge/solver/rl/common/actors.py", line 86, in observe
    self._adder.add(action, next_timestep)
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/transition.py", line 133, in add
    super().add(*args, **kwargs)
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/base.py", line 207, in add
    self._write()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/transition.py", line 169, in _write
    reward, discount = tree.map_structure(
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in map_structure
    [func(*args) for args in zip(*map(flatten, structures))])
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in <listcomp>
    [func(*args) for args in zip(*map(flatten, structures))])
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/adders/reverb/transition.py", line 151, in <lambda>
    get_all_np = lambda x: x[self._first_idx:self._last_idx].numpy()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/reverb/trajectory_writer.py", line 595, in __getitem__
    return TrajectoryColumn(self._slice(val), path=path)
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/reverb/trajectory_writer.py", line 620, in __init__
    raise ValueError('TrajectoryColumns cannot contain any None data '
ValueError: TrajectoryColumns cannot contain any None data references, got [<reverb.libpybind.WeakCellRef object at 0x154828261030>, <reverb.libpybind.WeakCellRef object at 0x1547700ba030>, <reverb.libpybind.WeakCellRef object at 0x15477007b7f0>, None] for TrajectoryColumn at path ('reward', slice(0, 4, None))

Please let me know if any additional details are required or let me know if I am missing anything here.

kmukeshreddy commented 2 years ago

Hello @qstanczyk , any insight/ update on the issue?

sinopalnikov commented 2 years ago

Hi Mukesh, which version of Acme do you use? could you share the definition of DistributedD4PG?

kmukeshreddy commented 2 years ago

Hello @sinopalnikov , Previously, I was using the Master version. But Currently, I am using 0.4.0 I still get the same errors.

Definition of DistributedD4PG:

# python3
# Copyright 2018 DeepMind Technologies Limited. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines the D4PG agent class."""

import copy
from typing import Any, Dict, Optional

import acme
import dm_env
import launchpad as lp
import reverb
import sonnet as snt
import tensorflow as tf
from acme import specs
from acme.adders import reverb as reverb_adders

# from acme.agents.tf.d4pg import agent
from acme.tf import savers as tf2_savers
from acme.utils import counting, loggers, lp_utils

from autohedge.common import constants

class DistributedD4PG:
    """Program definition for D4PG."""

    def __init__(
        self,
        environment: dm_env.Environment = None,
        networks_dict: Dict[str, snt.Module] = None,
        agent: Any = None,
        obj_func: str = constants.MEAN_STD,
        critic_loss_type: str = constants.QR,
        threshold: float = 0.95,
        num_actors: int = 1,
        num_caches: int = 0,
        environment_spec: Optional[specs.EnvironmentSpec] = None,
        batch_size: int = 256,
        prefetch_size: int = 4,
        min_replay_size: int = 1000,
        max_replay_size: int = 1000000,
        samples_per_insert: Optional[float] = 32.0,
        n_step: int = 5,
        sigma: float = 0.3,
        clipping: bool = True,
        discount: float = 1.0,
        policy_optimizer: Optional[snt.Optimizer] = None,
        critic_optimizer: Optional[snt.Optimizer] = None,
        target_update_period: int = 100,
        max_actor_steps: Optional[int] = None,
        log_every: float = 1.0,
        replay_table_name: str = reverb_adders.DEFAULT_PRIORITY_TABLE,
    ):
        self.agent = agent

        if not environment_spec:
            environment_spec = specs.make_environment_spec(environment)

        # TODO(mwhoffman): Make network_factory directly return the struct.
        # TODO(mwhoffman): Make the factory take the entire spec.
        def wrapped_network_factory(networks_dict):

            # networks_dict = network_factory(action_spec)
            networks = self.agent.D4PGNetworks(
                policy_network=networks_dict.get("policy"),
                critic_network=networks_dict.get("critic"),
                observation_network=networks_dict.get("observation", tf.identity),
            )
            return networks

        self._environment = environment
        self._network_factory = wrapped_network_factory(networks_dict)
        self._environment_spec = environment_spec
        self._sigma = sigma
        self._num_actors = num_actors
        self._num_caches = num_caches
        self._max_actor_steps = max_actor_steps
        self._log_every = log_every

        self._builder = self.agent.D4PGBuilder(
            # TODO(mwhoffman): pass the config dataclass in directly.
            # TODO(mwhoffman): use the limiter rather than the workaround below.
            self.agent.D4PGConfig(
                obj_func=obj_func,
                critic_loss_type=critic_loss_type,
                threshold=threshold,
                discount=discount,
                batch_size=batch_size,
                prefetch_size=prefetch_size,
                target_update_period=target_update_period,
                policy_optimizer=policy_optimizer,
                critic_optimizer=critic_optimizer,
                min_replay_size=min_replay_size,
                max_replay_size=max_replay_size,
                samples_per_insert=samples_per_insert,
                n_step=n_step,
                sigma=sigma,
                clipping=clipping,
                replay_table_name=replay_table_name,
            )
        )

    def replay(self):
        """The replay storage."""
        return self._builder.make_replay_tables(self._environment_spec)

    def counter(self):
        return tf2_savers.CheckpointingRunner(counting.Counter(), time_delta_minutes=1, subdirectory="counter")

    def coordinator(self, counter: counting.Counter):
        return lp_utils.StepsLimiter(counter, self._max_actor_steps)

    def learner(self, replay: reverb.Client, counter: counting.Counter):
        """The Learning part of the agent."""

        # Create the networks to optimize (online) and target networks.
        online_networks = self._network_factory
        target_networks = copy.deepcopy(online_networks)

        # Initialize the networks.
        online_networks.init(self._environment_spec)
        target_networks.init(self._environment_spec)

        dataset = self._builder.make_dataset_iterator(replay)

        counter = counting.Counter(counter, "learner")
        logger = loggers.make_default_logger("learner", time_delta=self._log_every, steps_key="learner_steps")

        return self._builder.make_learner(
            networks=(online_networks, target_networks),
            dataset=dataset,
            counter=counter,
            logger=logger,
            checkpoint=True,
        )

    def actor(
        self, replay: reverb.Client, variable_source: acme.VariableSource, counter: counting.Counter
    ) -> acme.EnvironmentLoop:
        """The actor process."""

        # Create the behavior policy.
        networks = self._network_factory
        networks.init(self._environment_spec)
        policy_network = networks.make_policy(environment_spec=self._environment_spec, sigma=self._sigma)

        # Create the agent.
        actor = self._builder.make_actor(
            policy_network=policy_network, adder=self._builder.make_adder(replay), variable_source=variable_source
        )

        # # Create the environment.
        # environment = self._environment_factory(False)

        # Create logger and counter; actors will not spam bigtable.
        counter = counting.Counter(counter, "actor")
        logger = loggers.make_default_logger(
            "actor", save_data=False, time_delta=self._log_every, steps_key="actor_steps"
        )

        # Create the loop to connect environment and agent.
        return acme.EnvironmentLoop(self._environment, actor, counter, logger)

    def evaluator(
        self, variable_source: acme.VariableSource, counter: counting.Counter, logger: Optional[loggers.Logger] = None
    ):
        """The evaluation process."""

        # Create the behavior policy.
        networks = self._network_factory
        networks.init(self._environment_spec)
        policy_network = networks.make_policy(self._environment_spec)

        # Create the agent.
        actor = self._builder.make_actor(policy_network=policy_network, variable_source=variable_source)

        # # Make the environment.
        # environment = self._environment_factory(True)

        # Create logger and counter.
        counter = counting.Counter(counter, "evaluator")
        logger = logger or loggers.make_default_logger(
            "evaluator", time_delta=self._log_every, steps_key="evaluator_steps"
        )

        # Create the run loop and return it.
        return acme.EnvironmentLoop(self._environment, actor, counter, logger)

    def build(self, name="d4pg"):
        """Build the distributed agent topology."""
        program = lp.Program(name=name)

        with program.group("replay"):
            replay = program.add_node(lp.ReverbNode(self.replay))

        with program.group("counter"):
            counter = program.add_node(lp.CourierNode(self.counter))

        if self._max_actor_steps:
            with program.group("coordinator"):
                _ = program.add_node(lp.CourierNode(self.coordinator, counter))

        with program.group("learner"):
            learner = program.add_node(lp.CourierNode(self.learner, replay, counter))

        with program.group("evaluator"):
            program.add_node(lp.CourierNode(self.evaluator, learner, counter))

        if not self._num_caches:
            # Use our learner as a single variable source.
            sources = [learner]
        else:
            with program.group("cacher"):
                # Create a set of learner caches.
                sources = []
                for _ in range(self._num_caches):
                    cacher = program.add_node(lp.CacherNode(learner, refresh_interval_ms=2000, stale_after_ms=4000))
                    sources.append(cacher)

        with program.group("actor"):
            # Add actors which pull round-robin from our variable sources.
            for actor_id in range(self._num_actors):
                source = sources[actor_id % len(sources)]
                program.add_node(lp.CourierNode(self.actor, replay, source, counter))

        return program

Please let me know if any additional details are required.

sinopalnikov commented 2 years ago

Does it work for you without your changes?

kmukeshreddy commented 2 years ago

@sinopalnikov : No, the errors were similar. It didn't work in both cases.

kmukeshreddy commented 2 years ago

@sinopalnikov : To confirm, could let me know which changes are you referring to?

Please let me know if there is any correction needed in the DistributedD4PG Definition.

sinopalnikov commented 2 years ago

I'm confused, the definition you pasted matches neither the master, nor 0.4.0

kmukeshreddy commented 2 years ago

Hi @sinopalnikov , I am using a custom environment so I made the changes as per my setup. (there isn't much difference from the master or 0.4.0).

The major difference is:

  1. Passing static environment in place of callable environment_factory.
  2. Passing network dict as param in place of network_factory.
  3. Passing D4PG agent as param.

Please let me know if it is not clear.

kmukeshreddy commented 2 years ago

@sinopalnikov / @qstanczyk : A gentle reminder regarding the request.

sinopalnikov commented 2 years ago

can you reproduce this error if you use the fake environment as above and DistributedD4PG class without the changes you made per your setup?

kmukeshreddy commented 2 years ago
  1. When I use the fake environment with a custom DistributedD4PG setup, I am getting the same errors.
  2. When I use the fake environment with the original DistributedD4PG setup, I am not getting any errors.

I am not sure, why is that. I haven't made any drastic changes in custom DistributedD4PG code only passing the environment as static.

Could you please let me know, how shall i proceed?

sinopalnikov commented 2 years ago

you can try to split the changes you made to DistributedD4PG into small self-contained pieces. then figure out which piece causes the error.

also please share the stack trace of the error that you get with the fake environment a custom DistributedD4PG setup

kmukeshreddy commented 1 year ago

@sinopalnikov : Thank you for the reply.

Regarding the Debugging manually: As per my observation there were no errors while building (.build()) the program. When launching the program using lp.launch, I am getting the below-mentioned errors. Please provide me insights if there is any better way to debug the actor/ learner step in the environment loop.

Strace for the fake environment with Custom DistributedD4PG setup:

Output for traceback.print_exc():

  File "/home/mukeshrk/AutoHedges/experiments/scripts/run_distributed_delta_agent.py", line 272, in run
    control.wait()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 415, in wait
    raise failure
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 461, in _check_workers
    worker.future.result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 438, in result
    return self.__get_result()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/launch/worker_manager.py", line 238, in run_inner
    future.set_result(f())
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/python/node.py", line 75, in _construct_function
    return functools.partial(self._function, *args, **kwargs)()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/launchpad/nodes/courier/node.py", line 130, in run
    instance.run()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 176, in run
    result = self.run_episode()
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/acme/environment_loop.py", line 125, in run_episode
    episode_return = tree.map_structure(operator.iadd,
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in map_structure
    [func(*args) for args in zip(*map(flatten, structures))])
  File "/home/mukeshrk/anaconda3/envs/ah/lib/python3.9/site-packages/tree/__init__.py", line 430, in <listcomp>
    [func(*args) for args in zip(*map(flatten, structures))])
numpy.core._exceptions._UFuncOutputCastingError: Cannot cast ufunc 'add' output from dtype('O') to dtype('float32') with casting rule 'same_kind' 

Output for pdb.set_trace():

> /home/mukeshrk/AutoHedges/experiments/scripts/run_distributed_delta_agent.py(308)run()
-> print("1="*20)
(Pdb) WARNING:tensorflow:Calling GradientTape.gradient on a persistent tape inside its context is significantly less efficient than calling it outside the context (it causes the gradient ops to be recorded on the tape, leading to increased CPU and memory usage). Only call GradientTape.gradient inside the context if you actually want to trace the gradient in order to compute higher order derivatives.
[2022-09-25 02:40:15,622][tensorflow][WARNING] - Calling GradientTape.gradient on a persistent tape inside its context is significantly less efficient than calling it outside the context (it causes the gradient ops to be recorded on the tape, leading to increased CPU and memory usage). Only call GradientTape.gradient inside the context if you actually want to trace the gradient in order to compute higher order derivatives.
[reverb/cc/client.cc:165] Sampler and server are owned by the same process (1783177) so Table priority_table is accessed directly without gRPC.
Worker groups that did not terminate in time: ['replay', 'counter', 'coordinator', 'learner', 'evaluator', 'actor']

Killing entire runtime.
Killed

Please let me know if any additional details are required.

sinopalnikov commented 1 year ago

the _UFuncOutputCastingError indicates there is a mismatch between the environment reward spec and the actual reward returned by the environment. you can use pdb to inspect 'episode_return' and 'timestep.reward' at 'environment_loop.py:125' when the exception occurs.

kmukeshreddy commented 1 year ago

@sinopalnikov : Thank you for the reply.

Regarding the suggested debugging: When I use the same environment in D4PG (without distributed). I am not getting any errors. But, when the same environment is used for DistributedD4PG the errors pop up.

I am not sure what's going wrong here.

To add more: If there is any issue in the environment or environment loop, I shouldn't be able to run for the D4PG agent. But I am only getting errors while using DistributedD4PG.

kmukeshreddy commented 1 year ago

@sinopalnikov : Update regarding the errors.

I was able to resolve the _UFuncOutputCastingError error, by making the static environment to callable. (I am not sure why I am getting those errors when I pass it as static)

But there was a new error, the learner was not able to terminate. I am getting the same error for the acme example file agent_distributed_test.py: https://github.com/deepmind/acme/blob/0.4.0/acme/agents/tf/d4pg/agent_distributed_test.py

I0927 00:54:54.442791 22436548863744 terminal.py:91] [Learner] Actor Episodes = 32 | Actor Steps = 800 | Critic Loss = 0.710 | Evaluator Episodes = 204 | Evaluator Steps = 5100 | Learner Steps = 963 | Learner Walltime = 10.689 | Policy Loss = 0.000
Worker groups that did not terminate in time: ['learner']

Killing entire runtime.
Killed

Could you please let me know how could I resolve this?

agent_distributed_test.py code (only changed the running method):

# python3
# Copyright 2018 DeepMind Technologies Limited. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Integration test for the distributed agent."""

from absl.testing import absltest
import acme
from acme import specs
from acme.agents.tf import d4pg
from acme.testing import fakes
from acme.tf import networks
from acme.tf import utils as tf2_utils
import launchpad as lp
import numpy as np
import sonnet as snt

def make_networks(action_spec: specs.BoundedArray):
  """Simple networks for testing.."""

  num_dimensions = np.prod(action_spec.shape, dtype=int)

  policy_network = snt.Sequential([
      networks.LayerNormMLP([50], activate_final=True),
      networks.NearZeroInitializedLinear(num_dimensions),
      networks.TanhToSpec(action_spec)
  ])
  # The multiplexer concatenates the (maybe transformed) observations/actions.
  critic_network = snt.Sequential([
      networks.CriticMultiplexer(
          critic_network=networks.LayerNormMLP(
              [50], activate_final=True)),
      networks.DiscreteValuedHead(-1., 1., 10)
  ])

  return {
      'policy': policy_network,
      'critic': critic_network,
      'observation': tf2_utils.batch_concat,
  }

class DistributedAgentTest(absltest.TestCase):
  """Simple integration/smoke test for the distributed agent."""

  def test_control_suite(self):
    """Tests that the agent can run on the control suite without crashing."""

    agent = d4pg.DistributedD4PG(
        environment_factory=lambda x: fakes.ContinuousEnvironment(bounded=True),
        network_factory=make_networks,
        num_actors=2,
        batch_size=32,
        min_replay_size=32,
        max_replay_size=1000,
        log_every= 1.0,
        max_actor_steps=100+10,
    )
    program = agent.build()

    try:
        control = lp.launch(program, launch_type=lp.LaunchType.LOCAL_MULTI_THREADING, terminal='current_terminal')
        control.wait()
    except:
        pass
    finally:
        print("Done")

if __name__ == '__main__':
  absltest.main()

Please let me know if any additional details are required.

kmukeshreddy commented 1 year ago

@sinopalnikov / @qstanczyk : A gentle reminder regarding the request.

qstanczyk commented 1 year ago

Worker groups that did not terminate in time: ['learner'] - this is because Learner is waiting to load data from Reverb, which is uninterruptible, resulting in learner being killed the hard way. Other then the warning message it should not cause other troubles, does it?

kmukeshreddy commented 1 year ago

Hello @qstanczyk ,

Thank you for the reply!

Due to this error, my program is getting crashed without execution of other blocks of code below control.wait().

                control = lp.launch(program, launch_type=lp.LaunchType.LOCAL_MULTI_THREADING, terminal='current_terminal')
                control.wait()
                print("Next block")
 = 9540 | Learner Steps = 843 | Learner Walltime = 31.804 | Steps Per Second = 103.071
Worker groups that did not terminate in time: ['learner']

Killing entire runtime.
Killed

I was able to get all the logs and saved models. image

But, my motive is to load the policy and use it for evaluation as the next step.

qstanczyk commented 1 year ago

You can do control.wait(labels_to_wait_for=['actor']) to wait for actors to do the job instead. Hope that helps.

kmukeshreddy commented 1 year ago

@qstanczyk : Thanks a lot that helped!

Could you please let me know if is there any way to store the model(policy & critic) weights at the last step before termination? I observed that the weights are being stored in the middle of execution, not at the last step.

kmukeshreddy commented 1 year ago

@qstanczyk : A gentle reminder!

sinopalnikov commented 1 year ago

@kmukeshreddy unfortunately this is not supported. As a workaround you could increase the frequency of snapshotting. Or try tweaking lp_utils.StepsLimiter to call the snapshotter before stopping the program.

kmukeshreddy commented 1 year ago

@sinopalnikov : Thank you for the reply!

Regarding the solutions which you proposed:

  1. I was not able to find a way how to increase the frequency of snapshots.
  2. How could I use lp_utils.StepsLimiter to call the snapshot?
kmukeshreddy commented 1 year ago

@sinopalnikov : There is one more new issue that I am facing. The program is not able to terminate after the successful execution of all the lines.

image

It's similar to running time.sleep("inf") at last step.

Could you please let me know how to resolve this? I was only able to observe this in Distributed D4PG execution.

kmukeshreddy commented 1 year ago

@sinopalnikov : A gentle reminder!

sinopalnikov commented 1 year ago
  1. I was not able to find a way how to increase the frequency of snapshots.

You would need to modify the learner to pass the desired snapshot (time_delta_minutes) here

  1. How could I use lp_utils.StepsLimiter to call the snapshot?

Roughly the idea is to add a method to the learner

  def snapshot(self):
    self._snapshotter.save(force=True)

Then modify StepsLimiter to take the learner as an additional argument. When the number of steps reaches the limit, call learner.snapshot() before the program is terminated here

sinopalnikov commented 1 year ago

It's similar to running time.sleep("inf") at last step.

Please try to figure out where exactly does the program hang. You should be able to do it with PDB for example.

sinopalnikov commented 1 month ago

Closing the issue, please feel free to reopen if further assistance is needed.