PaddlePaddle / Paddle

PArallel Distributed Deep LEarning: Machine Learning Framework from Industrial Practice (『飞桨』核心框架,深度学习&机器学习高性能单机、分布式训练和跨平台部署)
http://www.paddlepaddle.org/
Apache License 2.0
22.14k stars 5.56k forks source link

How DataProvider works in multi nodes training #378

Closed zuowang closed 7 years ago

zuowang commented 7 years ago

When training on multi nodes, trainers on all nodes train a pass together. But trainers on each node get DataBatch from DataProvider independently, how do they know which part of the whole data should it fetch from the source?

qingqing01 commented 7 years ago

When training using multi nodes at cluster in our company, the data are splitter into multi nodes at first before training.

zuowang commented 7 years ago

@hedaoyuan, Could you tell me where is the relevant code of this logical? Thanks!

hedaoyuan commented 7 years ago

@zuowang If you consider a single machine which has multiple nodes inside. The related code is MultiGradientMachine.cpp. Generates multiple TrainerThread based on trainer_count. The API to get the data is TrainerThread::copyInArgs. If you are thinking about training with multiple machines. Such as @qingqing01 said, the data should be splitted into multi nodes before training.

zuowang commented 7 years ago

@hedaoyuan Here is how I run the demo to train on a cluster: will the data be splitted or should I manually split the data before executing run.sh

cd ~/paddle/paddle/scripts/cluster_train

modify conf.py

HOSTS = [
        "root@172.17.0.7",
        "root@172.17.0.8",
        ]

run demo mnist

export PATH_TO_LOCAL_WORKSPACE=/root/paddle/demo/mnist
sh run.sh
backyes commented 7 years ago

@zuowang

will the data be splitted or should I manually split the data before executing run.sh ? 

Yes.

job_dispatch_package set it with local workspacedirectory, it will be dispatched to all nodes set in conf.py. It could be helpful for frequent hacking workspace files, otherwise frequent mulit-nodes workspace deployment could make your crazy. job_workspace set it with already deployed workspace directory, paddle.py will skip dispatch stage to directly launch cluster job with all nodes. It could help to reduce heavy dispatch latency.
zuowang commented 7 years ago

I think it's necessary to implement a data spliter so that it would be quite easy to train in multi nodes.

wangkuiyi commented 7 years ago

Hi @zuowang ,

Thank you for the issue. Yes, I agree it is a good idea to provide a data splitter. Just a step further about the idea of data splitter -- it should be a standalone program, but not part of Paddle framework.

I think so because when we run Paddle with big data, we need to use Hadoop MapReduce or Spark or Hive to pre-process the data, and such tools automatically partition data into chunks. To run small-scale jobs, we can partition data using AWK and Bash. In most cases, I am afraid the partitioning depends on the data format and data file format. I am afraid that it is not easy to create a generally-applicable splitter for labeled audio segments with ASR model training and aligned text pairs with machine translation.

wangkuiyi commented 7 years ago

@hedaoyuan Thanks for pointing out the current solution for @zuowang -- making use of the trainer id.

It is important to clarify that trainer id works only if we schedule Paddle jobs manually or using MPI-related technology. We will have to remove "trainer id" ASAP so could we support real big jobs.

To run big-scale training jobs, we'll have to make Paddle support auto fault recovery. Consider a job of 100 worker processes is scheduled by a modern distributed operating system like Mesos or Kubernetes. It is very often that these worker processes get pre-empted by other jobs with higher priority. If Paddle doesn't support fault recovery, each time a few worker processes get pre-empted, the whole job would have to be restarted. Thus the job would take much longer time to complete than we imagined.

People might ask why Mesos or Kubernetes. That is because a real business needs to run many kinds of jobs -- online servers, log collectors, online/offline data processing, data storage, and AI. Most companies would like to run all these jobs on a single cluster, so Mesos/Kubernetes could efficiently schedule workers that require CPU and those require main memory and those require disk I/O bandwidth on a single computer to make full use of it. Hence the idea of cloud computing.

Some cloud services used VMs to make good use of real computers, but VM introduces a lot of overhead and is replaced recently by Docker + Mesos/Kubernetes.

Let's get back to the key --- Mesos/Kubernetes could shut down workers of a Paddle job when the hardware resource doesn't meet the need of some new high-priority jobs. It could also restart (or add) workers when there is some idle resource. This means that each Paddle job could have an indefinite number of workers, and there is no way to assign 0-based consecutive worker/trainer ids.

A common solution to this is that we introduce a master process for each Paddle job, and the master assigns tasks to workers, where each task could be a training data file.

I hope this answers the question of @zuowang from the long-term perspective.