Closed zyyhub closed 4 years ago
本地看一下,是否是超内存了
报错代码如下:
from __future__ import print_function
import os
#import sys
import argparse
from PIL import Image
import numpy as np
import paddle
import paddle.fluid as fluid
import io
import math
#import paddlecloud.visual_util as visualdl
#import paddlecloud.model_util as model_util
#mu = model_util.ModelUtil()
img_emb_path = "./thirdparty/total_images_emb"
title_emb_path = "./thirdparty/total_titles_emb"
entity_emb_path = "./thirdparty/total_entitys_emb"
train_mapfile = "./train_data"
val_mapfile = "./test_data"
save_dirname = "./output/model/"
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--cpu_num', type=str, default='1', help="train cpu number")
parser.add_argument('--num_epochs', type=int, default=3, help="number of epochs")
parser.add_argument('--use_parallel_exe', type=int, default=0, help="if use parallel_executor")
args = parser.parse_args()
return args
def data_reader(mapfile, img_emb_path, title_emb_path, entity_emb_path):
def reader():
files = os.listdir(mapfile)
for fi in files:
with io.open(mapfile + '/' + fi, 'r', encoding='utf-8') as fr:
for line in fr:
cols = line.strip().split('\t')
if len(cols) != 9:
continue
ideaid, imgname, img_num, titleid, mt_idx, trade1_idx, trade2_idx, entity_idx, label = cols
imgname_list = imgname.strip().split('#')
if len(imgname_list) == 1:
img_emb1 = np.loadtxt(os.path.join(img_emb_path, imgname + '.jpg.txt'))
img_emb2 = np.loadtxt(os.path.join(img_emb_path, imgname + '.jpg.txt'))
img_emb3 = np.loadtxt(os.path.join(img_emb_path, imgname + '.jpg.txt'))
elif len(imgname_list) == 3:
img_emb1 = np.loadtxt(os.path.join(img_emb_path, imgname_list[0] + '.jpg.txt'))
img_emb2 = np.loadtxt(os.path.join(img_emb_path, imgname_list[1] + '.jpg.txt'))
img_emb3 = np.loadtxt(os.path.join(img_emb_path, imgname_list[2] + '.jpg.txt'))
else:
continue
titleid = titleid + '.txt'
entity_idx = entity_idx + '.txt'
title_emb = np.loadtxt(os.path.join(title_emb_path, titleid))
entity_emb = np.loadtxt(os.path.join(entity_emb_path, entity_idx))
fea_vec = np.concatenate((img_emb1, img_emb2, img_emb3, title_emb, entity_emb), axis = 0)
# print(len(fea_vec), img_num, mt_idx, trade1_idx, trade2_idx, label)
yield fea_vec, int(img_num), int(mt_idx), int(trade1_idx), int(trade2_idx), int(label)
return reader
def nccl2_prepare(main_prog, startup_prog):
"""
prepare nccl2
"""
port = os.getenv("PADDLE_PORT", "6174")
trainer_ips = os.getenv("PADDLE_TRAINERS")
eplist = []
for ip in trainer_ips.split(","):
eplist.append(':'.join([ip, port]))
trainer_endpoints = ",".join(eplist) # ip:port,ip:port...
current_endpoint = os.getenv("POD_IP") + ":" + port
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(
int(os.getenv("PADDLE_TRAINER_ID", "0")),
trainers=trainer_endpoints,
current_endpoint=current_endpoint,
startup_program=startup_prog,
program=main_prog)
def pserver_prepare(train_prog, startup_prog):
"""
prepare pserver
"""
port = os.getenv("PADDLE_PORT", "6174")
trainer_ips = os.getenv("PADDLE_PSERVERS")
eplist = []
for ip in trainer_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist) # ip:port,ip:port...
current_endpoint = os.getenv("POD_IP") + ":" + port
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
config = fluid.DistributeTranspilerConfig()
config.slice_var_up = False
t = fluid.DistributeTranspiler(config=config)
t.transpile(
int(os.getenv("PADDLE_TRAINER_ID", "0")),
program=train_prog,
pservers=pserver_endpoints,
trainers=int(os.getenv("PADDLE_TRAINERS_NUM", "0")),
startup_program=startup_prog)
if training_role == "PSERVER":
pserver_program = t.get_pserver_program(current_endpoint)
pserver_startup_program = t.get_startup_program(
current_endpoint,
pserver_program,
startup_program=startup_prog)
return pserver_program, pserver_startup_program
elif training_role == "TRAINER":
train_program = t.get_trainer_program()
return train_program, startup_prog
else:
raise ValueError('TRAINING_ROLE environment variable must be either TRAINER or PSERVER')
def run_pserver(train_prog, startup_prog):
"""
run pserver
"""
server_exe = fluid.Executor(fluid.CPUPlace())
server_exe.run(startup_prog)
server_exe.run(train_prog)
def bceloss(predict, label):
softmax_vec = fluid.layers.concat([fluid.layers.elementwise_sub(fluid.layers.fill_constant_batch_size_like(predict, [-1, 1], 'float64', 1), predict), predict], axis=1)
return softmax_vec
def loss_net(predict, label):
softmax_vec = bceloss(predict=predict, label=label)
loss = fluid.layers.cross_entropy(input=softmax_vec, label=label)
avg_loss = fluid.layers.mean(loss)
return predict, avg_loss
def multilayer_perceptron(concat_emb, label):
fc1 = fluid.layers.fc(input=concat_emb, size=1024, act='relu')
fc2 = fluid.layers.fc(input=fc1, size=512, act='relu')
predict = fluid.layers.fc(input=fc2, size=1, act='sigmoid')
return loss_net(predict, label)
def train(is_local,
training_role,
pserver_endpoints,
current_endpoint,
trainer_num,
trainer_id,
use_parallel_executor=False,
use_cuda=False, is_sparse=True):
print('train start...')
#define net
fea_vec = fluid.data(name='fea_vec', shape=[None, 3256], dtype='float64')
img_num = fluid.data(name='img_num', shape=[None, 1], dtype='float64')
mt_idx = fluid.data(name='mt_idx', shape=[None, 1], dtype='int64')
trade1_idx = fluid.data(name='trade1_idx', shape=[None, 1], dtype='int64')
trade2_idx = fluid.data(name='trade2_idx', shape=[None, 1], dtype='int64')
mt_emb = fluid.embedding(input=mt_idx, size=[11, 8], dtype='float64')
trade1_emb = fluid.embedding(input=trade1_idx, size=[32, 8], dtype='float64')
trade2_emb = fluid.embedding(input=trade2_idx, size=[223, 16], dtype='float64')
concat_emb = fluid.layers.concat(input=[fea_vec, img_num, mt_emb, trade1_emb, trade2_emb], axis = 1)
net_conf = multilayer_perceptron
label = fluid.data(name='label', shape=[None, 1], dtype='int64')
predict, avg_loss = net_conf(concat_emb, label)
optimizer = fluid.optimizer.Adam(learning_rate=0.00001)
optimizer.minimize(avg_loss)
#train_reader = paddle.batch(reader = paddle.reader.shuffle(reader=data_reader(train_mapfile, img_emb_path, title_emb_path, entity_emb_path), buf_size=8), batch_size=4)
train_reader = paddle.batch(reader = data_reader(train_mapfile, img_emb_path, title_emb_path, entity_emb_path), batch_size=20)
val_reader = paddle.batch(reader=data_reader(val_mapfile, img_emb_path, title_emb_path, entity_emb_path), batch_size=20)
#define exe
startup_program = fluid.default_startup_program()
main_program = fluid.default_main_program()
val_program = main_program.clone(for_test=True)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
startup_exe = fluid.Executor(place)
def val_test(val_program, val_feed, val_reader):
#avg_loss_list = np.array([])
#predict_result = []
accum_loss = 0.0
batch_num = 0
for val_data in val_reader():
avg_loss_np = exe.run(feed=val_feed.feed(val_data), fetch_list=[avg_loss])
batch_num += len(avg_loss_np[0])
accum_loss += sum(avg_loss_np[0])
#print(avg_loss_np, batch_num, accum_loss)
avg_loss_val_mean = accum_loss / batch_num
return avg_loss_val_mean
def train_loop():
feeder = fluid.DataFeeder(feed_list=[fea_vec, img_num, mt_idx, trade1_idx, trade2_idx, label], place=place)
epochs = [epoch_id for epoch_id in range(EPOCH_NUM)]
lists = []
batch = 0
print('epoch start...')
for epoch_id in epochs:
for batch_id, data in enumerate(train_reader()):
#print("epoch_id:", epoch_id, "batch_id:", batch_id)
#print("batch data:", feeder.feed(data[:2]))
#metrics = exe.run(main_program, feed=feeder.feed(data[:2]), fetch_list=[avg_loss])
metrics = exe.run(feed=feeder.feed(data), fetch_list=[avg_loss])
if batch_id % 5 == 0:
print(metrics)
if use_parallel_executor:
print("Batch %d, Epoch %d, Parallel Cost %f" % (batch_id, epoch_id, np.mean(metrics[0])))
else:
print("Batch %d, Epoch %d, Cost %f" % (batch_id, epoch_id, metrics[0]))
avg_loss_val = val_test(val_program=val_program, val_feed=feeder, val_reader=val_reader)
print("Val with Epoch %d, avg_cost: %s" % (epoch_id, avg_loss_val))
lists.append((epoch_id, avg_loss_val))
if save_dirname is not None:
#fluid.io.save_inference_model(save_dirname + '/model.' + str(epoch_id), ['fea_vec', 'img_num', 'mt_idx', 'trade1_idx', 'trade2_idx'], \
# [predict], startup_exe, model_filename=None, params_filename=None)
fluid.io.save_inference_model(save_dirname + '/model.' + str(epoch_id), [], [predict], startup_exe, model_filename=None, params_filename=None)
# find the best pass
best = sorted(lists, key=lambda list: float(list[1]))[0]
print('Best epoch is %s, var Avgcost is %s' % (best[0], best[1]))
if not is_local and os.getenv("DISTRIBUTE_JOB_TYPE") == "PSERVER":
startup_exe.close()
if is_local:
print("is_local")
startup_exe.run(startup_program)
if use_parallel_executor:
exe = fluid.ParallelExecutor(use_cuda=use_cuda, loss_name=avg_loss.name, main_program=main_program)
else:
print("not parallel")
exe = startup_exe
train_loop()
elif "NCCL2" == os.getenv("DISTRIBUTE_JOB_TYPE"):
nccl2_prepare(main_program, startup_program)
startup_exe.run(startup_program)
strategy = fluid.ExecutionStrategy()
strategy.num_threads = 1
strategy.allow_op_delay = False
exe = fluid.ParallelExecutor(use_cuda=use_cuda, loss_name=avg_loss.name, exec_strategy=strategy, main_program=main_program,
num_trainers=int(os.getenv("PADDLE_TRAINERS_NUM", "0")), trainer_id=int(os.getenv("PADDLE_TRAINER_ID", "0")))
train_loop()
elif "PSERVER" == os.getenv("DISTRIBUTE_JOB_TYPE"):
print("pserver...")
main_program, startup_program = pserver_prepare(main_program, startup_program)
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
if training_role == "PSERVER":
print("run_pserver...")
run_pserver(main_program, startup_program)
elif training_role == "TRAINER":
print("trainer...")
startup_exe.run(startup_program)
exe = fluid.ParallelExecutor(use_cuda=use_cuda, loss_name=avg_loss.name, main_program=main_program)
train_loop()
else:
raise ValueError('DISTRIBUTE_JOB_TYPE environment variable must be either NCCL2 or PSERVER')
if __name__ == '__main__':
print('start...')
port = os.getenv("PADDLE_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVERS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
trainer_num = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
current_endpoint = os.getenv("POD_IP", "localhost") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))
#is_local = True
print("is_local:", is_local)
args = parse_args()
os.environ['CPU_NUM'] = args.cpu_num
EPOCH_NUM = args.num_epochs
train(is_local=is_local,
training_role=training_role,
pserver_endpoints=pserver_endpoints,
current_endpoint=current_endpoint,
trainer_num=trainer_num,
trainer_id=trainer_id,
use_parallel_executor=bool(args.use_parallel_exe))
经线下与用户沟通和排查,定位到问题是用户的concat使用了不同rank的tensor输入,log如下:
CPUPlace Op(concat), inputs:{AxisTensor[], X[fea_vec:double16, 3256, img_num:double16, 1, embedding_0.tmp_0:double16, 1, 8, embedding_1.tmp_0:double16, 1, 8, embedding_2.tmp_0:double16, 1, 16]}, outputs:{Out[concat_0.tmp_0:double16, 3260]}.
在CPU设备上执行时,由于kernel函数未进行检查,最后导致buddy_allocator越界了。
p.s. PyTorch在处理concat的输入是不同rank情况时,会直接报错,Paddle也应该加强一下。
感谢,问题解决了。