Angel-ML / angel

A Flexible and Powerful Parameter Server for large-scale machine learning
Other
6.75k stars 1.6k forks source link

An discuss of PyAngel phase-2 #276

Open biaoma-ty opened 6 years ago

biaoma-ty commented 6 years ago

当前PyAngel的功能还非常有限,有许多工作要做,另外,API也不是很友好,这里开启一个关于PyAngel讨论,如果您对PyAngel感兴趣,请联系我或者在社区中反馈,谢谢。

当前PyAngel的API主要还是源自于Java版本,相当于Java版本的简单封装,在易用性上还有很多不足,同时也过多暴露了一些不必要的细节,增加了用户上手以及维护的难度,PyAngel由于在Phase-2阶段将会实现用户所需的全部API实现,用户可以完全通过Python接口实现自定义的算法模型

编程模型

以近几年的情况而言,大多数的计算框架都有一个显示的程序入口,来封装上下文环境以及变量创建,函数调用,比如:-

# Import `tensorflow`
import tensorflow as tf

# Initialize two constants
x1 = tf.constant([1,2,3,4])
x2 = tf.constant([5,6,7,8])

# Multiply
result = tf.multiply(x1, x2)

# Initialize Session and run `result`
with tf.Session() as sess:
  output = sess.run(result)
  print(output)

dtype = torch.FloatTensor

dtype = torch.cuda.FloatTensor # Uncomment this to run on GPU

N is batch size; D_in is input dimension;

H is hidden dimension; D_out is output dimension.

N, D_in, H, D_out = 64, 1000, 100, 10

Create random input and output data

x = torch.randn(N, D_in).type(dtype) y = torch.randn(N, D_out).type(dtype)

Randomly initialize weights

w1 = torch.randn(D_in, H).type(dtype) w2 = torch.randn(H, D_out).type(dtype)

- MxNet的mxnet[^3]:
```python
import mxnet as mx
from mxnet import nd
mx.random.seed(1)
x = nd.empty((3, 4))
print(x)

可以看出,这些框架都有一个显式的入口,用户通过这个入口可以导入相关的包,直接创建变量,计算等,将实现逻辑单一化,我们可以提供angel作为对应的成员,Angel封装MLConf,AngelConf等环境变量,持有Learner这一算法模型的抽象,用户通过angel来完成绝大多数的操作。

当前问题

目前的Angel的API还是命令式为主,和Java本身有一定的关系,有些复杂,对用户暴露的接口略多,尤其是Task。大多数场景下,Task这一层完全可以对用户透明,除非用户需要深度定制,完成特定计算单元上的操作,比如FPGA上的操作(这部分其实也属于后续需要做的事情,我们同样需要简洁易扩展的资源封装模块) 但目前,这些问题会将用户的精力无谓的分摊在语言的细节上面,阻止了新来用户尽快的将Angel部署搭建起来,算法框架应该让用户只关心算法的表达,Python之所以成为AI领域使用最广泛的语言也是基于此,比如NumPy,SciPy等将工业界发展成熟的C,Fortran的数值计算,统计建模库引入,上层通过很直观的操作就可以使用这些高性能库,比如下面的例子[^4]:

>>> import numpy
>>> a = numpy.range(12)
>>> a
array([0, 1, 2, 3, , 5, 6, 7, 8, 9, 10, 11])
>>> type(a)
<class 'numpy.ndarray'>
>>> a.shape
(12, 0)
>>> a.shape = 3, 4
>>> a
array([[ 0, 1, 2, 3],
       [ 4, 5, 6, 7],
       [ 8, 9, 10, 11]])
>>> a[:, 1]
array([1, 5, 9])
>>> a.transpose()
array([[ 0, 4, 8],
       [ 1, 5, 9],
       [ 2, 6, 10],
       [ 3, 7, 11]])

如果我们在PSModel这层的API能达到这样的水准,无疑可能会成为很好的机器学习框架的API了,将上面的numpy替换为angel就可以直接操作PSModel,就可以处理超大规模的矩阵,这是PyAngel Phase-2的目标,这里PyAngel也会将angel作为程序的上下文,用户在这里创建模型,加载数据,训练以及预测。

API Demo

User Interface Usage
Angel 封装用户的Angel环境上下文
PS_Model 本质是矩阵和向量计算操作的封装,用户可以在自定义的PS_Model中实现对Parameter Server和worker之间的数据fetch以及计算流程的定制
Learner 通过操作PS_Model来封装算法模型,比如模型加载/训练/预测三个典型操作

上述三个类将会作为PyAngel编程的核心概念,无论是调用现有算法型或者用户自定义模型,均可通过对上述三个类的操作来实现,下面举例说明:

# Import Angel context
import angel as ag

# Set params for cluster and algorithm model
params = {
    'feaureNum':127,
    'featureNzz':25,
    'treeNum':2,
    'treeDepth':2,
    'splitNum':10,
    'sampleRatio':1.0,
    'dataFmt':'libsvm',
    'learnRate':0.01,
    'psWorkerGroupNum':1,
    'workerTaskNum':1,
    'psNum':1 
}

# Create an GBDT model
gbdt_model = ag.create(model_name='gbdt')

# Train the GBDT model
gbdt_model.fit(params=params, input_path='your sample data path', model_path='your model path')
gbdt_model.predict(model_path='your model path', input_path='your input path' output_path='your output path')

如图所示,上面是一个 GBDT 的python版本的demo,调用model对模型进行训练并且预测数据的典型步骤如下:

参数 用途
Angel Feature number of train data
PS_Model Number of nonzero features
Learner Tree number
treeDepth Tree depth
splitNum Split number
sampleRatio Feature sample ratio
dataFmt Data format
learnRate Learning rate
psWorkerGroupNum Number of PS worker groups
workerTaskNum Number of tasks run in a worker
psNum Number of parameter servers

上面的Demo是用户去调用一个已经实现好的算法模型需要编写的代码,如果用户需要自定义算法模型,就需要自己实现相应的PS_Model的操作以及Learner来对算法做封装,同时需要在Angel中注册,先讨论下PS_Model的封装,PS_Model当前的功能如下:

域/方法名 用途
matrixCtx MatrixContext的实例句柄, 通过 new MatrixContext(modelName, row, col, blockRow, blockCol)创建
getTaskContext 返回TaskContext
getContext 返回MatrixContext
getClient 返回ps matrix client
getMatrixId(): Int 返回matrix ID
setNeedSave(Boolean): this.type 设置模型是否需要呗保存
setAttribute(String, String): this.type 通过KV对的方式设置matrix的属性
setAverage(Boolean): this.type 设定属性的平均值,返回值如果是true则表示调用update方法时,应该先用matrix除以task个数,然后在被发送到ps
setOplogType(String): this.type 设置matrix更新时的存储格式
setOplogType(MatrixOlLogType): this.type 设置matrix的row格式
setRowType(MLProtos.RowType): this.type 设置matrix的row格式
setLoadPath(String): this.type 设置模型加载的路径
setSavePath(String): this.type 设置模型保存的路径
syncClock(Boolean) matrix的同步锁
clock(Boolean): Future[VoidResult] 将缓存的matrix的操作刷写到持久化存储中(可选, 根据输入参数决定)以及更新matrix的同步锁
flush(): Future[VoidResult] 刷写缓存的matrix操作
increment(TVector) 为matrix增加一行类型相同的向量,这个操作将会在调用flush方法后同步到ps端
increment(List[Tvector]) 为matrix批量增加类型相同的向量,这个操作将会在调用flush方法后同步到ps端
get(GetFunc): GetResult 通过传入的GetFunc从ps端得到任何想要的matrix的部分
getRow(Int):TVector 通过传入的行索引得到某一行向量
getRows(RowIndex, Int): Map[Int, TVector] 通过传入的行索引以及行数批量得到连续的向量
getRows(Array[Int]): List[TVector] 通过传入的行索引的列表批量得到任意位置的向量
getRowsFlow(RowIndex, Int): GetRowsResult 通过流水线的方式得到任意位置开始的连续的向量
getRowsWithIndex(Int, Array[Int]): TVector 通过索引得到指定的索引的数组得到特定位置的值
update(UpdateFunc): Future[VoidResult] 通过psf更新matrix
zero() 将matrix清零
finalize(): Unit 类型默认实现
this(String, Int, Int, Int, Int, Boolean)(TaskContext) 构造参数
apply(String, Int, Int, Int, Int)(TaskContext) 构造参数

首先,可以改造的更加pythonic一些的地方是row相关的获取的方法,可以通过重写python中的magic method以及支持切片来实现直接操作matrix的方式,比如获取从第4行开始,连续的5组向量

matrix.getRows(3, 5)

可以改写为

matrix[3:8]

对于matrix的扩展,可以用

inc(matrix, t_vector)

来代替:

matrix.increment(tVector)

或者使用:

matrix += t_vector

其他类型的setter,getter等非matrix值直接操作的方法可以用字典操作来代替,如:

matrix.setOpLogType(...)

改为:

matrix['op_log_type'] = ...

上面是对PS_Model的修改的讨论,下面介绍下Learner的修改,这里的Learner专指com.tencent.angel.ml.MLLearner,而非Spark on Angel的Learner,目前MLLearner共有12个子类,每个子类对应一个算法实现,有 逻辑回归,线性回归,支持向量机,主题模型,K-Means,矩阵分解,梯度提升决策树,以及比较新的 MLR 等等,并且还在增加中,这些特定算法模型的Learner都需要实现一个train方法,在train方法中实现模型的训练过程,但是实现中并不是很优雅,比如LDALearner中,train方法并没有实现,而是自己写了一个train方法,感觉不是很优雅,python版本的实现需要考虑这些问题,因为一旦API确定,以后如果轻易修改会是灾难。Learner的实现这里,如果比较完备的实现,需要将Learner用python几乎重写一遍,工作量还是有一些的。 下面对用户自定义的算法模型做一个demo, 假设用户自定义了一个GBDT算法模块: 首先需要定义Model完成对底层matrix的封装:

class GBDTModel(PS_Model):
    params = [
        ('feat_num', conf[Angel.ML_FEATURE_NUM])
        ...
    ]
    sketch = new PSModel(...)
    sketch['...'] = ...
    add_ps_model('...', sketch)
    ...
    def predict(self):
        ...

之后需要在angel上下文中注册

gbdt_model = new GBDTModel()
ag.register(gbdt_model)

定义完毕Model之后,还需要进一步定义Learner

class MyGBDTLearner(MLLearner):
    def train(self):
        ...

然后像自定义的model一样,也需要在Angel中注册,这里其实也可以只注册Learner,而Model的注册步骤下放到Learner中,也即:

melin commented 5 years ago

这个一直没有进展

biaoma-ty commented 5 years ago

Angel的Python版本用户较少,这个issue提出后,也没有多少关注和讨论,暂时挂起,有必要的时候会继续

melin commented 5 years ago

Angel的Python版本用户较少,这个issue提出后,也没有多少关注和讨论,暂时挂起,有必要的时候会继续

实际python 需求是最大的,spark 官方也意识到spark mlib 太过工程化,不太适合算法人员使用,我们大部分任务都是写python,实际数据分析或者算法人员也只会sql+python,这也是这几年python 又火起来的原因。

我理解的这个issues没有什么人关注,是因为算法+分析人员不关注这个项目,因为是scala 写的,他们只关注python 的算法库,算法+分析人员他们不关注算法是否为分布式,在公司推广angel,他们基本不会关注,因为他们不会用。会写scala对python 研究也不深,他们更愿意写scala,对python 需求也不大。

image image (job 类型只scala/java 写的任务)