PaddlePaddle / Paddle

PArallel Distributed Deep LEarning: Machine Learning Framework from Industrial Practice (『飞桨』核心框架,深度学习&机器学习高性能单机、分布式训练和跨平台部署)
http://www.paddlepaddle.org/
Apache License 2.0
22.08k stars 5.55k forks source link

The imperative programming paradigm #7464

Closed wangkuiyi closed 6 years ago

wangkuiyi commented 6 years ago
r = fluid.data.mnist.train()

W = fluid.Tensor()
b = fluid.Tensor()

with fluid.While(iters=100):
    mb = r.next()

    x = fluid.layer.data(mb.image)
    l = fluid.layer.data(mb.label)
    y = fluid.layer.fc(x, W, b)
    cost = fluid.layer.mse(y, l)

    fluid.optimize(cost)

fluid.print(W, b)
wangkuiyi commented 6 years ago
r = fluid.data.mnist.train()

W = fluid.Tensor()
b = fluid.Tensor()

with fluid.While(iters=100):
    mb = r.next()

    x = fluid.layer.data(mb.image)
    l = fluid.layer.data(mb.label)
    y = fluid.layer.fc(x, W, b)
    cost = fluid.layer.mse(y, l)

    fluid.optimize(cost)

fluid.save(W, b, "model.json")
W, b = fluid.load("model.json", "W", "b")

mb = r.next()[1:2]
x = fluid.layer.data(mb)
y = fluid.layer.fc(x, W, b)
fluid.print(y)
wangkuiyi commented 6 years ago

We can use atexit to register a launcher.

For example, we could have a launcher which saves the ProgramDesc into a file, or another launcher that calls the C++ Executor class to run the ProgramDesc, or yet another launcher that calls a transpiler (or a pipe of transpilers) to convert the ProgramDesc into C++ programs and build them into a binary executable.

wangkuiyi commented 6 years ago

Do we still need the data layer, or should we just call fluid.layer.fc(input=mb.image, ...)?

reyoung commented 6 years ago

default

For fully parallel, we might use goroutine and channel.

BTW, I suggest we use Process instead of Goroutine, since

  1. Process is a general concept than goroutine. Sharing data between processes directly is forbidden. The only way to share data between processes is by using channels. (We can also rename channel to pipe or mailbox to adopt the concepts in the operating system.)

  2. We are not using golang and our users may not familiar with golang. However, users should familiar with the operating system.

Some basic thinking about parallel program

There is some basic thinking of imperative programming paradigm in parallel.

  1. Every Process will run a Program. The Program will be run when all inputs are ready. In other words, Process is a runtime Program. (It is a very common concept in the operating system.)

  2. Communication between Program can only use a channel or named pipe. As we show in the figure, the Channel is not a part of any program. So Channel should not be a Variable. It should be an independent type.

  3. We should create a new compile-type representation to manage multiple/concurrent Program and Channels. I am not sure the name of this representation. Maybe ConcurrentProgram or ProgramGroup?

There are some fundamental questions.

Q1: Should we use Channel and Process to implement CSP? A similar approach to CSP is actor model, which erlang implemented.

Here is a simple introduction about Actor Model:

I think the actor model may be better because

Q2: Should we implement coroutines or fibers for Process? Should we support yield the running (or release the CPU to another Process explicitly)?

This can be implemented by thread temporarily. But at least, we should have a single, long-term goal for everyone.

Q3: It is hard for users write a CSP or Actor Model directly. Should we provide a transpiler for this?

I think it is neccesary to privde a transpiler to a CSP or Actor Model.The whole idea about a CSP or Actor Model for deep learning is very exciting. However, it is very hard to implement CSP and a transpiler directly. We should devided it into several sub-tasks. Perhaps we will have some temporary implementations, but we should have a long-term goal and plan in details to get this splendid job done.

wangkuiyi commented 6 years ago

We might be able to use Python multiprocess to communicate between a Python process and a Fluid process. -- from @emailweixu

typhoonzero commented 6 years ago

I took a glance at http://ray.readthedocs.io/en/latest/tutorial.html, for distributed training fluid API, we may have two main design:

  1. Using channel
    
    grad_param_mapping = dict()

def def_trainer(): r = fluid.data.mnist.train() W = fluid.Tensor() b = fluid.Tensor() with fluid.While(iters=100): mb = r.next()

    x = fluid.layer.data(mb.image)
    l = fluid.layer.data(mb.label)
    y = fluid.layer.fc(x, W, b)
    cost = fluid.layer.mse(y, l)
    grads, params = fluid.bp(cost)
    for idx, grad in enumerate(grads):
    grad_param_mapping[grad.name] = params[idx]
    fluid.send(grads, overwrite=params)

def def_pserver(): with fluid.listen_and_serv() as server: with fluid.While(): grad = server.recv_chan.get() param = grad_param_mapping.get(grad.name) fluid.optimize(param, grad) server.return_chan.push(param)

2. Using actor mode (https://github.com/ray-project/ray/blob/master/examples/parameter_server/async_parameter_server.py)
```python
@fluid.remote
class Pserver:
    def __init__(self, name_map):
        self.grad_param_mapping = name_map
    def send(self, grad):
        param = self.grad_param_mapping.get(grad.name)
        fluid.optimize(param, grad)
    def get(self, varname):
        return self.grad_param_mapping.get(grad.name)

@fluid.remote
def trainer():
    ...
QiJune commented 6 years ago

A simple demo of integrating fluid and ray.

import ray
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import numpy

ray.init()

NUM_WORKERS = 2
BATCH_SIZE = 128

def mnist_network():
    img = fluid.layers.data(name='img', shape=[784])
    hidden = fluid.layers.fc(img, size=100, act='tanh',
                             param_attr='fc.w',
                             bias_attr='fc.b')
    prediction = fluid.layers.fc(hidden, size=10, act='softmax',
                                 param_attr='sftmax.w',
                                 bias_attr='sftmax.b')
    label = fluid.layers.data(name='label',shape=[1],
                              dtype='int64')
    loss = fluid.layers.cross_entropy(input=prediction, label=label)

    avg_loss = fluid.layers.mean(x=loss)
    fluid.backward.append_backward(avg_loss)
    return img, label, avg_loss

@ray.remote
class Worker(object):
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.scope = fluid.core.Scope()
        self.program = fluid.Program()
        self.startup = fluid.Program()
        with fluid.program_guard(self.program, self.startup):
            img, label, self.loss = mnist_network()

        self.place = fluid.CPUPlace()
        self.executor = fluid.Executor(self.place)
        self.reader_creator = paddle.batch(
            paddle.reader.shuffle(
                paddle.dataset.mnist.train(), buf_size=8192),
            batch_size=BATCH_SIZE)

        self.reader = self.reader_creator()
        self.feeder = fluid.DataFeeder(feed_list=[img, label], place=self.place)

    def compute_gradient(self, weights):
        for var_name in weights:
            tensor = self.scope.var(var_name).get_tensor()
            tensor.set(weights[var_name][0], self.place)
            tensor.set_lod(weights[var_name][1])

        try:
            data = next(self.reader)
        except:
            self.reader = self.reader_creator()
            data = next(self.reader)

        outs = self.executor.run(self.program,
                     feed=self.feeder.feed(data),
                               scope=self.scope,
                               fetch_list=[ var_name + "@GRAD" for var_name in weights] + [self.loss])
        if self.worker_id == 0:
            print outs[-1]
        return outs[:-1]

@ray.remote
class PServer(object):
    def __init__(self, learning_rate):
        self.scope = fluid.core.Scope()
        self.learning_rate = learning_rate
        self.program = fluid.Program()
        self.startup = fluid.Program()
        with fluid.program_guard(self.program, self.startup):
            mnist_network()

        self.place = fluid.CPUPlace()
        self.executor = fluid.Executor(self.place)
        self.executor.run(self.startup, scope=self.scope)
        self.optimize_program = fluid.Program()

    def apply_gradients(self, *gradients):
        # TODO(qijun) an optimization program is needed
        mean_gradients = numpy.mean(gradients, axis=0)
        weights = self.get_weight()
        for idx, name in enumerate(weights):
            w = weights[name][0]
            w -= self.learning_rate * mean_gradients[idx]
            self.scope.find_var(name).get_tensor().set(w, self.place)

    def get_weight(self):
        weights = dict()
        for p in self.program.global_block().iter_parameters():
            lod_tensor = self.scope.find_var(p.name).get_tensor()
            weights[p.name] = (numpy.copy(numpy.array(lod_tensor)), lod_tensor.lod())
        return weights

if __name__ == '__main__':
    ps = PServer.remote(1e-3 * NUM_WORKERS)
    weights = ps.get_weight.remote()

    works = [Worker.remote(i) for i in range(NUM_WORKERS)]

    while True:
        gradients = [work.compute_gradient.remote(weights) for work in works]
        ps.apply_gradients.remote(*gradients)
        weights = ps.get_weight.remote()
typhoonzero commented 6 years ago

@QiJune in the above code, ps should be a member of Worker, and the worker should call get_weight after apply_gradients.

And, yes, using ray be a simple way to implement dist train, but I doubt it could use more memory because it's using a redis instance to store objects, so all the weights in pserver must store both in fluid and redis so that workers can fetch them, and I must take a look into the implementaion of ray to confirm it that's true.

reyoung commented 6 years ago

ps should be a member of Worker, and the worker should call get_weight after apply_gradients.

No.

gradients = [work.compute_gradient.remote(weights) for work in works]
ps.apply_gradients.remote(*gradients)

This example contains N workers and one pserver. In the main function, these lines will wait all workers complete.

typhoonzero commented 6 years ago

ps should be a member of Worker, and the worker should call get_weight after apply_gradients.

Sorry, my fault. @ray.remote decorated class is server instances, it should not be the member of the worker. Yet workers should update local weights after ps.apply_gradients.remote(*gradients)

chengduoZH commented 6 years ago
        try:
            data = next(self.reader)
        except:
            self.reader = self.reader_creator()
            data = next(self.reader)

This is to say that all the workers use their own data reader, right? @QiJune In my view, the Worker.compute_gradient's parameter should be data and weights.

QiJune commented 6 years ago

@chengduoZH This is just a simple demo following sync_parameter_server in ray. And we can add several data reader actors and a data provider actor for many workers.

QiJune commented 6 years ago

@typhoonzero Has refine the code and call get_weight method after apply_gradients. The loss curve seems normal.

typhoonzero commented 6 years ago

@QiJune Cool 👍!

reyoung commented 6 years ago

Just list some facts about Ray.

Ray might not be complete for our needs for several reasons:

  1. Ray is strongly tied to Python and using pickle to serialize objects.
  2. The scheduler is implemented inside Ray.
    • Ray can take over all CPUs and GPUs as a service, by ray start
    • In users' scripts, use ray.init can connect to the cluster service before.
    • Actors are managed by Ray schedulers. i.e., which node will execute the actor is decided by Ray framework.
    • We cannot use k8s to explicitly customize the scheduler without digging into Ray deeply.
    • The scheduler of Ray is not mature for now(according the issues and documentation, there could be schedule two actors on the same GPU when there are actually two GPUs).
  3. The Ray implements the Actor Model. However, Ray gives an object-oriented API for hiding the real message APIs of Actor Models. Comparing the standard actor model, there are some limits of Ray.

Goods about Ray.

  1. A separate project about communication. It can be easily integrated into any framework.
  2. Messages will only be read by its need. If an actor is just receiving a message and routing it to another actor, the message will not be read twice.
  3. Ray implements actor-model, which is scalable and fault-tolerant by nature.
  4. Ray has a WebUI for debugging.

We may not use Ray directly. However, we can learn a lot from this project. We may implement a separate project for communication and provides C++ API to the fluid. Fluid may provide some operators or higher-level concepts to represent Actor Model or CSP. It seems to be a practical way.

wangkuiyi commented 6 years ago

I quickly went over Ray. It seems an implementation of the Actor model in Python? In my understanding, Ray is heavily integrated with Python. However, Fluid is trying to get out of Python. Are they having conflicting ideas?

Yancey1989 commented 6 years ago

From @typhoonzero

for distributed training fluid API, we may have two main design: Using channel

def def_pserver():
with fluid.listen_and_serv() as server:
with fluid.While():
grad = server.recv_chan.get()
param = grad_param_mapping.get(grad.name)
fluid.optimize(param, grad)
server.return_chan.push(param)

For my personal opinion:

  1. Recv Op is executed in the background, so maybe we need to create a Goroutine and execute it and pass the channel as an argument.
  2. A Channel need a type.

So, maybe the program would look like:

def def_pserver():
    chan = fluid.Channel(type=sendrecv.message)
    with fluid.Go(chan):
        with fluild.listen_and_serv() as server:
                msg = server.get_msg()
                chan.push(msg)

    with fluid.While():
         grad = chan.get()
         param = grad_param_mapping.get(grad.name)
         fluid.optimize(param, grad)
         return_chan.push(param)
shanyi15 commented 6 years ago

您好,此issue在近一个月内暂无更新,我们将于今天内关闭。若在关闭后您仍需跟进提问,可重新开启此问题,我们将在24小时内回复您。因关闭带来的不便我们深表歉意,请您谅解~感谢您对PaddlePaddle的支持! Hello, this issue has not been updated in the past month. We will close it today for the sake of other user‘s experience. If you still need to follow up on this question after closing, please feel free to reopen it. In that case, we will get back to you within 24 hours. We apologize for the inconvenience caused by the closure and thank you so much for your support of PaddlePaddle Group!