Closed Vogen closed 7 years ago
see mx.model._update_params_on_kvstore
Thanks a lot! I read your implementation of A3C again #4313 and I'm not sure if I understand dist kvstore correct. I modify some of your code and here is how I use dist kv store manually on windows 10.
launcher.py
import sys
import os
import subprocess
import logging
from threading import Thread
import argparse
import signal
sys.path.append(os.environ['MXNET_HOME'] + "/dmlc-core/tracker")
from dmlc_tracker import tracker
def exec_cmd(cmd, role, taskid, pass_env):
"""Execute the command line command."""
if cmd[0].find('/') == -1 and os.path.exists(cmd[0]) and os.name != 'nt':
cmd[0] = './' + cmd[0]
cmd = ' '.join(cmd)
env = os.environ.copy()
for k, v in pass_env.items():
env[k] = str(v)
env['DMLC_TASK_ID'] = str(taskid)
env['DMLC_ROLE'] = role
env['DMLC_JOB_CLUSTER'] = 'local'
ntrial = 0
while True:
env['DMLC_NUM_ATTEMPT'] = str(ntrial)
ret = subprocess.call(cmd, shell=True, env=env)
if ret != 0:
ntrial += 1
continue
def submit(args):
"""Submit function of local jobs."""
def mthread_submit(nworker, nserver, envs):
"""
customized submit script, that submit nslave jobs, each must contain args as parameter
note this can be a lambda function containing additional parameters in input
Parameters
----------
nworker: number of slave process to start up
nserver: number of server nodes to start up
envs: enviroment variables to be added to the starting programs
"""
procs = {}
for i in range(nworker + nserver):
if i < nworker:
role = 'worker'
else:
role = 'server'
procs[i] = Thread(target=exec_cmd, args=(args.command, role, i, envs))
procs[i].setDaemon(True)
procs[i].start()
# call submit, with nslave, the commands to run each job and submit function
tracker.submit(args.num_threads, args.num_servers, fun_submit=mthread_submit,
pscmd=(' '.join(args.command)))
def signal_handler(signal, frame):
logging.info('Stop luancher')
sys.exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Launch a distributed job')
parser.add_argument('-n', '--num-threads', required=True, type=int,
help = 'number of threads per gpu')
parser.add_argument('-s', '--num-servers', type=int,
help = 'number of server nodes to be launched, \
in default it is equal to NUM_WORKERS')
parser.add_argument('command', nargs='+',
help = 'command for launching the program')
args, unknown = parser.parse_known_args()
args.command += unknown
if args.num_servers is None:
args.num_servers = args.num_threads
signal.signal(signal.SIGINT, signal_handler)
submit(args)
main.py
import mxnet as mx
import numpy as np
import time
import os
id = os.environ['DMLC_TASK_ID']
kv = mx.kvstore.create("dist_device_sync")
a = mx.nd.ones((1,1))
b = mx.nd.ones((1,1))
kv.init(0, a*0)
for x in range(5):
kv.push(0, mx.nd.ones((1,1))*x)
kv.pull(0, out = a)
print id, a.asnumpy()
use cmd to run launcer
python launcher.py -n 2 python main.py
cmd output
10 [[[ 0.]][ 0.]]
01 [[ 2.]][
[ 2.]]
01 [[ 4.]]
[[ 4.]]
01 [[[ 6.]][ 6.]]
01 [[[ 8.]][ 8.]]
This issue is closed due to lack of activity in the last 90 days. Feel free to reopen if this is still an active issue. Thanks!
When using dist kv store, launch.py while create several subprocesses with different DMLC_ROLE, "schedule", "worker" or "server". And after that, kv store would works automatically. I want to manually push and to pull of dist kv store, just staring several process with configured environments and doing kv.push, Is that possible?