openai / spinningup

An educational resource to help anyone learn deep reinforcement learning.
https://spinningup.openai.com/
MIT License
10.01k stars 2.21k forks source link

Problem with concurrent testing of multiple stored agents #164

Closed vkotronis closed 5 years ago

vkotronis commented 5 years ago

Hello, I have a custom environment, on which I have trained DRL agents using the PPO algorithm. I have saved snapshots of several agents (i.e., trained policies) every epochs, and now I want to test these agents to find out which performs the best in the wild. Note that I have trained multiple agents both in space (e.g., same environment but with different tuning parameters) and time (training epochs). I am trying to test them concurrently in Python, but it seems that there is some kind of an issue when I am trying to run the loaded policies in parallel; e.g., with 2 agents and 2 saved epochs per agent, there are issues after the first epoch is tested (see also code later in this comment).

The exact issue is:

!!!Running policy for epoch 2
!!!Running policy for epoch 2
Process Process-1:
Traceback (most recent call last):
  File "/home/vkotronis/Desktop/git_projects/DRL/venv_new/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1356, in _do_call
    return fn(*args)
  File "/home/vkotronis/Desktop/git_projects/DRL/venv_new/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1341, in _run_fn
    options, feed_dict, fetch_list, target_list, run_metadata)
  File "/home/vkotronis/Desktop/git_projects/DRL/venv_new/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1429, in _call_tf_sessionrun
    run_metadata)
tensorflow.python.framework.errors_impl.FailedPreconditionError: Attempting to use uninitialized value pi/dense_1/kernel
     [[{{node pi/dense_1/kernel/read}}]]
...
(plus other cascading trace failures not put here for brevity)

Could it be that some kind of tensorflow lock on the session prevents more than one testing sessions from being used at the same time? I have studied the load and run policy function from spinup and no such issue seems to be present in the code, but maybe I am missing sth.

My code (I have removed the non-relevant parts), is the following:

import argparse
import concurrent.futures
import glob
import gym
import os
import sys
sys.path.insert(0, '../spinningup/spinup')
from spinup.utils.logx import EpochLogger
from spinup.utils.test_policy import load_policy, run_policy
import time

def test_my_gym_env(inner_logdir):
    logdir_name = inner_logdir.split('/')[-1]
    # extract params from logdir name
   <......>

    # create the testing environment
    env = gym.make(
            'my-test',
            param1=...,
            ....
        )

    # setup logger and start counting time (I am misusing the epoch logger for storing also test stuff)
    logger = EpochLogger(output_dir=inner_logdir, output_fname='test_progress.txt')
    start_time = time.time()

    # check what has been saved
    agent_epoch_saves = [int(x[11:]) for x in os.listdir(inner_logdir) if 'simple_save' in x and len(x)>11]

   # test each epoch
    for i,epoch in enumerate(sorted(agent_epoch_saves)):

        # load the currently trained agent/policy for this epoch
        print("!!!Loading policy for epoch {}".format(epoch))
        _, get_action = load_policy(inner_logdir, itr=epoch)

        # run the loaded policy on the test environment
        print("!!!Running policy for epoch {}".format(epoch))
        run_policy(
            env,
            get_action)

        # logging etc.
        <logging>

    return logdir_name

parser = argparse.ArgumentParser(description="test agent")
parser.add_argument("-g", "--exp_logs", dest="exp_logs", type=str, help="exp logs location")
parser.add_argument("-j", "--job_count", dest="job_count", type=int, help='number of parallel jobs')
args = parser.parse_args()

outer_logdir = args.exp_logs.rstrip('/')
inner_logdirs = glob.glob("{}/*".format(outer_logdir))

time_started = time.time()
# concurrent futures for parallel execution
with concurrent.futures.ProcessPoolExecutor(max_workers=args.job_count) as executor:
    done_jobs = executor.map(test_my_gym_env, inner_logdirs)
time_ended = time.time()

Could you help with this please? If possible, simply try to test in parallel (e.g., using distinct processes) with such a code two different agents. Note that I am not doing anything fancy with tensorflow in the backend, I am simply using the spinup utils and APIs (by the way, several thanks for this repository and the corresponding algorithmic offering!!!)

vkotronis commented 5 years ago

I have located the issue with parallelism in line https://github.com/openai/spinningup/blob/master/spinup/utils/test_policy.py#L20 It seems that sth is going wrong with the restoration process (e.g., some kind of locking), since the session tries to load a new agent, but sees the tf variables from a previously loaded session for some reason. See this error:

Process Process-1:
Traceback (most recent call last):
  File "/home/vkotronis/Desktop/git_projects/DRL/venv_new/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1356, in _do_call
    return fn(*args)
  File "/home/vkotronis/Desktop/git_projects/DRL/venv_new/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1341, in _run_fn
    options, feed_dict, fetch_list, target_list, run_metadata)
  File "/home/vkotronis/Desktop/git_projects/DRL/venv_new/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1429, in _call_tf_sessionrun
    run_metadata)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Assign requires shapes of both tensors to match. lhs shape= [128,36] rhs shape= [128,1296]
     [[{{node save_2/Assign_19}}]]

The non-matching tensors belong to different agents that should actually be loaded in parallel. I will check in detail what the restoration function does exactly.

Following this a bit further it seems that sth is not fork-safe in this line in the code: https://github.com/openai/spinningup/blob/master/spinup/utils/logx.py#L57

I am using separate processes and sessions of course, but maybe there is sth that is dedicated to a single session at a time.

if you have any ideas in the meanwhile ping here please.

vkotronis commented 5 years ago

Solved this by fully parallelizing the entire testing script using subprocess instead of concurrent.futures. Closing the issue.