PaddlePaddle / Paddle

PArallel Distributed Deep LEarning: Machine Learning Framework from Industrial Practice (『飞桨』核心框架,深度学习&机器学习高性能单机、分布式训练和跨平台部署)
http://www.paddlepaddle.org/
Apache License 2.0
22.17k stars 5.56k forks source link

Paddle 应该是什么形态 #594

Closed wangkuiyi closed 7 years ago

wangkuiyi commented 7 years ago

如果要允许用户在iPython和Jupiter之类的界面里写Paddle程序,那么Paddle得是一个library(提供本地函数调用)或者一个RPC server(提供远程函数调用),而不能是目前的executable command line tool的形式。

在library和RPC server之间的选择是个问题。欢迎大家讨论。

wangkuiyi commented 7 years ago

上述问题实际上是两个问题:

  1. API应该是底层(或者说细粒度)的,还是高层(粗粒度)的?
  2. API的形式应该是本地调用(library)还是远程调用(RPC)?

当然,如果选择是RPC,那么就应该是粗粒度API了,因为用RPC描述细粒度API的运行时开销太大了。

wangkuiyi commented 7 years ago

关于API粒度,大家有一个讨论,记录在这里:

Distributed Tensorflow这篇文章看,Tensorflow的API很底层。一方面允许Python程序描述很细节的概念,另一方面让用户学习曲线比较高。很多用户实际上用的是Tensorflow的wrapper Keras。Paddle的API应该是Tensorflow API这个层次的呢,还是Keras这个层次的呢?

backyes commented 7 years ago

@wangkuiyi

感觉先要确定试图达到的目标, 目标是让用户好用,还是让用户灵活使用? 还是让系统工程师更容易集成PaddlePaddle到不同的平台,抑或是其他目标。

@wangkuiyi 对paddle是抽象出api,还是抽象出rpc,还是现在的command line进程方式的讨论,关乎到paddlepaddle能潜在的渗透到哪些不同平台,哪些不同领域,这非常赞。目前单一的在mpi、ssh多机、k8s等等有限的场景下,确实还有待进一步开发。 不知道我有没有领会了 @wangkuiyi 的宗旨?

luotao1 commented 7 years ago

希望能兼顾好用和灵活使用

让系统工程师更容易集成PaddlePaddle到不同的平台

这点是不是可以往后放。

zhouxiao-coder commented 7 years ago

集成平台的事情感觉可以由团队提供技术支持,involve 外面的团队进来做,接口不必暴露的太多。

llxxxll commented 7 years ago

@backyes 好用和灵活应该是面向两种不同的场景。

我对好用的理解:

其实这两个并不冲突,这个issue应该更侧重于系统本身的形态,针对普通用户的讨论应该在另外一个issue中。

reyoung commented 7 years ago

整体设计code

master.py

# Master是整体全局上的一个注册机制。所有的信息会注册到这个Master里面,master是一个信息同步的节点。
import paddle

# redundancy 是说这个世界中的pserver需要有2个副本。
# 也就是每一个ParameterBlock存放到两个不同的pserver上。默认是1
master = paddle.Master(addr=":8000", redundancy=2) 

# 开始监听,直到退出
master.start_and_join()

client.py

import paddle

# Context是使用设备的上下文。Paddle究竟使用多少设备,在这里指定
context = paddle.Context(devices=[paddle.cpu_all, paddle.gpu_all])  # use all device in one node.

# Context可以连接到一个Master。连接到master就是集群训练。否则就是单机训练。
context.connect("master_ip",  role=paddle.WORKER)

# 定义一个网络。前面的注解说明这个函数是一个网络定义。
@context.network()
def simple_network(network):
      # network参数是一个网络定义的函数集合,包括了我们支持的layers
      ipt = network.data_layer(name="input", size=784)
      hidden = network.fc_layer(input=ipt, size=200)
      predict = network.fc_layer(input=hidden, size=10, act=SoftmaxActivation())
      cost = network.classification_cost(input=predict, label=network.data_layer(name="input", size=10))
      return cost  # 返回优化的目标。相当于现在paddle的outputs

# define a data provider, same as current Paddle process.
@paddle.provider()
def process_data(settings, filename):
    for sample in read_from_file(filename):
         yield sample

# train all networks in current context.
context.with_train_data(train_files=["a.txt", "b.txt"], method=process_data)  # set train data, and data provider
     .with_test_data(test_files=["c.txt"], test_period=Batch(1000), method=process_data) # set test data
     .use_optimizer(SgdOptimizer())  # set optimizer.
     .standard_sgd_train(num_passes=100)  # set use standard sgd strategy to train 100 pass.

context.exit(0)  # send exit message to scheduler, to kill all pserver.

pserver.py

import paddle
context = paddle.Context(devices=[paddle.cpu_all])  # pserver only use all cpu is enough.

# Hello master, I'm a pserver.
context.connect("master_ip", role=paddle.PSERVER)

# Create a standard pserver.
pserver = context.new_standard_pserver()

# Start and join.
pserver.start_and_join()

实现重点

这里实现重点在于

wangkuiyi commented 7 years ago

@reyoung 在你上面思路里,pserver/worker/master 这几个服务是Paddle团队写的,还是用户写的?

reyoung commented 7 years ago

Paddle 集群训练重构和API

重构后可以解决的问题

图片

新的集群训练拓扑结构

图片

角色

新的集群训练的角色共有四种,每一种角色的简单定义如下\:

其中,基本的原则是:

Parameter和Parameter Block

在说明各个角色的使用方法之前,我们先说明一些Paddle基本的概念。

一个Parameter指的是神经网络某一层的参数和参数同维度的东西。例如,参数值(Value), 参数梯度(Gradient),参数动量(Momentum)。每一个神经网络中的每一个参数都有一个唯一的名字。

ParameterBlock是指,每一个神经网络的参数被切分成的子块。这些子块均匀分布在各个PServer上,整体构成一个Parameter。每一个ParameterBlock包含Parameter的id,和Parameter的区间段 [0,200], [200, 400]之类的。

PServer能看到的基本数据是ParameterBlock

Client => Master API

Client会将模型配置和dataprovider的函数,全部序列字符串送交到Master上执行(可以使用python的pickle包)。序列化上去之后,如果需要哪些第三方库,也可以将依赖的名字使用字符串传递给Master。

Client可以读写Master上的任意KV数据。client通过读写这些KV数据来控制训练进程Master上面会根据客户端传过来的模型配置,dataprovider函数(其实应该是一个main.py),和依赖脚本,打包成一个Dockerfile,分发给各个worker启动。

FROM paddle:latest
COPY main.py /
COPY requirements.txt / 
COPY packages.txt /
RUN pip install -U /requirements.txt # 安装依赖
RUN apt install -y $(cat packages.txt) # 安装依靠的debian包,这个可以去掉
ENTRYPOINT python /main.py --master_addr="192.168.170.1:54321"  # connect to master.

执行流程是:

Worker => PServer API

Worker和PServer之间的沟通非常简单。只有两点,

并且这两点完全无锁。也就是只要worker向PServer请求,PServer就把最新的东西给Worker

Master => PServer API

Master到PServer之间的沟通非常简单, 只有几个操作。

PServer => Master API

Worker => Master API

Worker的多机训练逻辑

master = ...  # master api
master.set_value_ne("num_passes", num_passes)
passes = master.get_value("num_passes")
while passes != 0:
    data = master.get_data_list()
    download_and_remove_data(data)  # get data from hdfs if data in data_list, remove data not in this list.
    data_provider.feed(data)  # data provider use data.
    pass_bar = master.create_barrier("start_pass")
    for each_batch in data_provider:
        master.wait_value("pause", False)  # wait training is not paused. Training will paused when optimizer
                                           # are changed, some pserver is down and we need backup param block to 
                                           # other pserver

        prefetch_bar = master.create_barrier("prefetch")
        param_blocks = gradient_machine.prefetch(each_batch)  # get all param_blocks used in this batch.
        # Get latest params from pservers.
        blocks_pserver_dict = master.get_pservers_from_param_blocks(param_blocks)
        parallel_for each_block in param_blocks:  # parallel for means we could use multiple thread for it.
            master.create_barrier("merge_gradient_"+ each_block.id)  # create barrier for merge gradient.
            pservers = blocks_pserver_dict[each_block]
            master.wait_value("value_ready_%s"%each_block.id, True)  # wait block value ready.
            for pserver in pservers:
              try:
                pserver.update(each_block)  # get value from pserver for each_block
                break
              except: continue

        prefetch_bar.wait()  # prefetch should be synced, because here we determined "merge_gradient" 
                             # barrier size.

        # local neural network has the lastest value.
        gradient_machine.forward_backward(lambda param: {
           # backward on each parameter callback.
           async { # do this job in background
             parallel_for each_block in param.blocks():
               for pserver in each_block.pservers():  # each block could associate to many pserver
                 pserver.push_gradient(each_block)  # push gradient, sync call. No matter it is success or not.

               # all gradient is pushed to pserver, we can reset local gradient here.

               async { each_block.reset_gradient() }
               master.do_ops(ops=master.get_value("Optimizer"),  # if all client after push gradient.
                            before_wait_bar="merge_gradient_%s" % each_block.id,  # wait all gradient pushed.
                            after_notify_value="value_ready_%s"%each_block.id)  # notify value complete 
                                                                                # if sgd done.
          }
        })

        gradient_machine.finish_batch()
    pass_bar.wait()  # sync all worker to each pass end.

Worker由于每一个pass都是从master重新获得训练数据列表,每一个batch都是向pserver重新获得训练参数,每一个barrier都是在worker端向master申请,worker退出时,网络连接断开,master可以增加析构掉对应的barrier。避免死锁。

所以Worker可以随便挂。

Master的实现逻辑

Master预先启动一堆PServer,然后启动的时候有一定的冗余性,比如同一个ParamBlock存储在两个或者两个以上的PServer上。

当worker断线时候的流程,是:

def on_worker_disconnected():
  barriers = get_barrers_from_worker()
  destroy_barriers(barriers)

当PServer断线时候的流程是(可以先不实现):

def on_pserver_disconnected():
  blocks = get_myself_blocks()
  set_value("pause", true)
  remove_myself_in_pserver_hash()
  rerange_blocks_to_pservers()
  set_value("pause", false)

当新增PServer的时候(可以先不实现):

def on_new_pserver():
   blocks = get_pserver_heavy_blocks()  # rerange blocks.
   copy_blocks_to_new_pserver()
backyes commented 7 years ago

有个问题哈: master作为总控节点充当了调度角色,调度节点需要从可靠性(宕机)、性能(都从master获取参数)、一致性(如果用副本实现master可靠性,面临副本一致性),似乎短期内(一两个月内)实现有压力,而且也有很多重复性工作。 个人觉得还是尽量复用已有系统,避免全部重新设计master。

你们觉得呢?

来自 魅蓝 note 3

-------- 原始邮件 -------- 发件人:Yu Yang notifications@github.com 时间:周二 11月29日 13:05 收件人:PaddlePaddle/Paddle Paddle@noreply.github.com 抄送:"Wang,Yanfei(IDL)" wangyanfei01@baidu.com,Mention mention@noreply.github.com 主题:Re: [PaddlePaddle/Paddle] Paddle 应该是什么形态 (#594)

Paddle 集群训练重构和API 重构后可以解决的问题

[图片]https://camo.githubusercontent.com/8866b856326da2258ccad8cbc6b01d461f43336c/687474703a2f2f626f732e6e6a2e6270632e62616964752e636f6d2f76312f6167726f75702f32336331346337613732623233656138383835373637636438373437633831373761373766613536

新的集群训练拓扑结构

[图片]https://camo.githubusercontent.com/b0a38ea5977ebd5c307f7f7a92626c81a35e8949/687474703a2f2f626f732e6e6a2e6270632e62616964752e636f6d2f76312f6167726f75702f31333933653634623032643536633734616330336266393438353265663130653364653139623663

角色

新的集群训练的角色共有四种,每一种角色的简单定义如下:

其中,基本的原则是:

Parameter和Parameter Block

在说明各个角色的使用方法之前,我们先说明一些Paddle基本的概念。

一个Parameter指的是神经网络某一层的参数和参数同维度的东西。例如,参数值(Value), 参数梯度(Gradient),参数动量(Momentum)。每一个神经网络中的每一个参数都有一个唯一的名字。

ParameterBlock是指,每一个神经网络的参数被切分成的子块。这些子块均匀分布在各个PServer上,整体构成一个Parameter。每一个ParameterBlock包含Parameter的id,和Parameter的区间段 [0,200], [200, 400]之类的。

PServer能看到的基本数据是ParameterBlock

Client => Master API

Client会将模型配置和dataprovider的函数,全部序列字符串送交到Master上执行(可以使用python的pickle包)。序列化上去之后,如果需要哪些第三方库,也可以将依赖的名字使用字符串传递给Master。

Client可以读写Master上的任意KV数据。client通过读写这些KV数据来控制训练进程Master上面会根据客户端传过来的模型配置,dataprovider函数(其实应该是一个main.py),和依赖脚本,打包成一个Dockerfile,分发给各个worker启动。

FROM paddle:latest COPY main.py / COPY requirements.txt / COPY packages.txt / RUN pip install -U /requirements.txt # 安装依赖 RUN apt install -y $(cat packages.txt) # 安装依靠的debian包,这个可以去掉 ENTRYPOINT python /main.py --master_addr="192.168.170.1:54321" # connect to master.

执行流程是:

Worker => PServer API

Worker和PServer之间的沟通非常简单。只有两点,

并且这两点完全无锁。也就是只要worker向PServer请求,PServer就把最新的东西给Worker

Master => PServer API

Master到PServer之间的沟通非常简单, 只有几个操作。

PServer => Master API

Worker => Master API

Worker的多机训练逻辑

master = ... # master api master.set_value_ne("num_passes", num_passes) passes = master.get_value("num_passes") while passes != 0: data = master.get_data_list() download_and_remove_data(data) # get data from hdfs if data in data_list, remove data not in this list. data_provider.feed(data) # data provider use data. pass_bar = master.create_barrier("start_pass") for each_batch in data_provider: master.wait_value("pause", False) # wait training is not paused. Training will paused when optimizer

are changed, some pserver is down and we need backup param block to

                                       # other pserver

    prefetch_bar = master.create_barrier("prefetch")
    param_blocks = gradient_machine.prefetch(each_batch)  # get all param_blocks used in this batch.
            # Get latest params from pservers.
    blocks_pserver_dict = master.get_pservers_from_param_blocks(param_blocks)
    parallel_for each_block in param_blocks:  # parallel for means we could use multiple thread for it.
            master.create_barrier("merge_gradient_"+ each_block.id)  # create barrier for merge gradient.
            pservers = blocks_pserver_dict[each_block]
            master.wait_value("value_ready_%s"%each_block.id, True)  # wait block value ready.
            for pserver in pservers:
              try:
                pserver.update(each_block)  # get value from pserver for each_block
                break
              except: continue

    prefetch_bar.wait()  # prefetch should be synced, because here we determined "merge_gradient"
                         # barrier size.

    # local neural network has the lastest value.
    gradient_machine.forward_backward(lambda param: {
               # backward on each parameter callback.
               async { # do this job in background
                 parallel_for each_block in param.blocks():
                   for pserver in each_block.pservers():  # each block could associate to many pserver
                     pserver.push_gradient(each_block)  # push gradient, sync call. No matter it is success or not.

                   # all gradient is pushed to pserver, we can reset local gradient here.

                   async { each_block.reset_gradient() }
                   master.do_ops(ops=master.get_value("Optimizer"),  # if all client after push gradient.
                                before_wait_bar="merge_gradient_%s" % each_block.id,  # wait all gradient pushed.
                                after_notify_value="value_ready_%s"%each_block.id)  # notify value complete
                                                                                    # if sgd done.
              }
            })

    gradient_machine.finish_batch()
pass_bar.wait()  # sync all worker to each pass end.

Worker由于每一个pass都是从master重新获得训练数据列表,每一个batch都是向pserver重新获得训练参数,每一个barrier都是在worker端向master申请,worker退出时,网络连接断开,master可以增加析构掉对应的barrier。避免死锁。

所以Worker可以随便挂。

Master的实现逻辑

Master预先启动一堆PServer,然后启动的时候有一定的冗余性,比如同一个ParamBlock存储在两个或者两个以上的PServer上。

当worker断线时候的流程,是:

def on_worker_disconnected(): barriers = get_barrers_from_worker() destroy_barriers(barriers)

当PServer断线时候的流程是(可以先不实现):

def on_pserver_disconnected(): blocks = get_myself_blocks() set_value("pause", true) remove_myself_in_pserver_hash() rerange_blocks_to_pservers() set_value("pause", false)

当新增PServer的时候(可以先不实现):

def on_new_pserver(): blocks = get_pserver_heavy_blocks() # rerange blocks. copy_blocks_to_new_pserver()

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHubhttps://github.com/PaddlePaddle/Paddle/issues/594#issuecomment-263477374, or mute the threadhttps://github.com/notifications/unsubscribe-auth/AA4Ml8fEw6sgoWbcmLCQj5FGgDZRFKXlks5rC7KkgaJpZM4K7VT5.

reyoung commented 7 years ago

master作为总控节点充当了调度角色

Master没有调度的角色。

Master: 集群管理的主节点。主要为集群提供: K-V数据库、每一个ParamBlock的哈希表、全局锁、对PServer的操作

reyoung commented 7 years ago

如果要完成上述重构工作,需要完成下面几项内容(我分成其他几个issue独立讨论):

  1. 可以使用python来驱动单机训练 #650
  2. 选择一个合适的RPC框架,或者优化目前的RPC框架 #647
  3. 简化PServer的书写,提取出Master节点 #649 depends_on #647
  4. IPython notebook的单机训练 depends_on 1.
  5. 为master、pserver添加python api depends_on 3.
  6. python的集群训练开发 depends_on 1. 5.
wangkuiyi commented 7 years ago

Paddle的新体系结构里要有一个master process,它负责把一个job拆分成tasks,并且组织workers和pservers来完成这些tasks。这个角色和很多分布式框架(包括MapReduce)类似。

目前Paddle只支持offline learning。所以每个task的很可能是一组minibatches。Paddle master通过把不同的task分给不同的worker processes去做,来实现data parallelism。

未来Paddle应该支持online learning,这样才能支持好reinforcement learning。这样的情况下,master会成为数据流的入口——master负责把流入的数据分组成minibatches,然后分发给workers执行。

除了分发(dispatch)tasks,master还应该是和客户程序交互的接入点——master应该是一个Restful API server,和运行在用户机器上的Paddle程序交互,听从安排,和反馈工作进度以及状态。

reyoung commented 7 years ago

@wangkuiyi 综上,我看了下Master希望存在的功能。我们是不是可以简化成三件事情完成?或者,在这三件事情的框架内,能否完成当前功能?

主体的思路是,能用一些标准的K-V数据库解决的工作,就直接引入K-V数据库。然后,将所有操作都和这个数据库紧密相联就好了。将控制,推送数据,汇报进度,都变成读写数据库操作就好了。

hohdiy commented 7 years ago

我这里也有一些粗浅的想法,之前了解过tf的设计以及spark on yarn的设计。 首先,不妨将机器学习的训练的整个步骤分成如下三个:

  1. 训练数据读入(对应paddle的data provider)
  2. 模型定义(对应model config)
  3. 模型执行(对应paddle train,实际上这里应该有逻辑计划和物理计划的生成)

其次考虑集群的模样:

  1. 像hadoop MR集群,有一个大的集群,所有的任务都提交到这里相互争抢,共享资源。
  2. 像spark on yarn,有一个底层的资源池,当需要时构建一个小的执行集群,执行任务,每个作业独占这个小的“虚拟”集群,这个用户可以持续提交其他的作业,不用的时候就销毁掉。

这两种集群模式的区别在于混布级别不同。个人认为,如果不是追求高的资源利用率,点2是个好的选择,它把资源管理和应用管理隔离,应用可以以独占形式运行,对应用的要求将会被大大降低。

我曾经准备在tf上做这样一个事情,构建一个app master来屏蔽单机和集群训练的差异,让用户的代码无需修改就可以适应运行,对于paddle应该也是类似:

reyoung commented 7 years ago

Closed because we are working on refactorization now.