Closed cicdw closed 7 years ago
By the way is it possible to just use scipy.optimize
? That way it will certainly be correct?
Not sure if you are using standard L-BFGS (with parallel gradient computation, sum the worker grads on master, take L-BFGS step) or the vector-free version (in which case I guess you would need to do your own impl)
I believe that scipy.optimize will convert things to numpy arrays. These may not fit in local memory. The current implementation keeps everything on the cluster except the final parameters.
The scipy.optimize L-BFGS-B optimization method passes the arrays to a Fortran routine via F2PY.
https://github.com/scipy/scipy/blob/master/scipy/optimize/lbfgsb/lbfgsb.pyf
scipy.optimize offers numerous optimization methods including L-BFGS-B. (L limited memory, B with simple bounds min f(x) subject to l<=x<=u) The L-BFGS-B implementation utilizes Fortran code from Richard H. Byrd et Al at NorthWestern university via F2PY. F2PY became part of numpy in 2007.
https://github.com/scipy/scipy/tree/master/scipy/optimize/lbfgsb
I see two candidates for BFGS when dask is distributed. This may also allow for a single node implementation that does not use as much memory as the Northwester university Fortran L-BFGS-B implementation. Xiaocheng Tang implemented a distributed L-BFGS in scala using (2) to replace the non distributed MLLib BFGS
Xiaocheng Tang's distributed L-BFGS Apache Spark implementation from (2)
https://github.com/LHAC/dac
MLLib optimization LBFGS.scala
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
Decentralized Quasi-Newton Methods
decentralized Broyden-Fletcher-Goldfarb-Shanno D-BFGS as variation of BFGS quasi-Newton
"fully distributed where nodes approximate curvature info of self and neighbor by satisfaction of secant condition. "
https://arxiv.org/pdf/1605.00933.pdf
Large-scale L-BFGS using MapReduce
Weizhu Chen, Zhenghao Wang, Jingren Zhou
Microsoft
{wzchen,zhwang,hrzhou} @ microsoft
"Vector-free L-BFGS which avoids the expensive dot product operations in the two loop recursion"
"We prove the mathematical equivalence
of the new Vector-free L-BFGS and demonstrate its excellent performance and
scalability using real-world machine learning problems with billions of variables
in production clusters."
http://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf
The way Spark does L-BFGS is that it actually uses Breeze's optimization API and L-BFGS (and OWLQN) implementation. That takes a function (DiffFunction
) which effectively defines the gradient and value functions.
Now, the specific impl of the DiffFunction
in this case actually just does an RDD.map
over the training data, aggregating gradient updates, and then reduces those per-partition gradients and ends up with a gradient vector on the master - that is what is passed through to the (master-local) L-BFGS Breeze impl.
So my question was effectively if the same approach would work with the scipy.optimize
interface - where the fn
is actually a distributed operation that ends up with a (reduced) array
that is then passed to the core routine. I suspect that this may be possible. My point was that this might be the easiest route without reimplementing L-BFGS.
The "vector-free" approach mentioned above is a lot more scalable in terms of feature dimension. For lower feature dimension, it's likely the "Spark-like" approach is slightly faster. @stoneyv thanks for the reference to that vector-free implementation. There is also this one for Spark ML: https://github.com/yanboliang/spark-vlbfgs which also implements OWLQN for L1 regularization.
So I did a quick 'n dirty attempt at a "Spark-like" approach to check, where the gradient & loss function at each iteration (should be) computed on the cluster and "reduced" locally to driver. This function is passed to scipy.optimize.fmin_l_bfgs_b
.
(Note I'm unfamiliar with dask details & internals).
Comparing to scikit-learn LogisticRegression
(with solver="lbfgs"
and effectively zero regularization, to ensure apples-to-apples):
import dask
import dask.array as da
import numpy as np
from scipy.optimize import fmin_l_bfgs_b
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
# make dataset
X, y = datasets.make_classification(n_classes=2, n_samples=1000)
# sklearn LR with effectively no regularization and using scipy's L-BFGS optimizer
lr = LogisticRegression(fit_intercept=False, solver="lbfgs", C=1e5, max_iter=10)
lr.fit(X, y)
# create dask implementation
Xda = da.from_array(X, 10)
yda = da.from_array(y, 10)
# logistic
def sigmoid(x):
'''Sigmoid function of x.'''
return 1 / (1 + da.exp(-x))
def compute_logistic_loss_grad(beta, X, y):
Xbeta = X.dot(beta)
# loss
eXbeta = da.exp(Xbeta)
loss_fn = (da.log1p(eXbeta)).sum() - da.dot(y, Xbeta)
# gradient
p = sigmoid(Xbeta)
gradient_fn = da.dot(X.T, p - y)
loss, gradient = dask.compute(loss_fn, gradient_fn)
return loss, gradient
func = lambda x, *args: compute_logistic_loss_grad(x, *args)[0:2]
n, p = X.shape
beta = np.zeros(p)
new_beta_dask, loss_dask, info_dask = fmin_l_bfgs_b(
func, beta, fprime=None,
args=(Xda, yda),
iprint=0, pgtol=1e-14, maxiter=10)
print lr.coef_
print new_beta_dask
[[ 0.16883034 0.27267816 0.14273016 -0.12745748 0.38907953 0.1066639
4.14527893 -0.92145276 1.010557 -0.15387399 0.24767802 -0.10193765
-0.11728545 0.05964625 0.27116258 -0.07226594 0.10390644 0.01430678
0.20764474 0.78352777]]
[ 0.16883064 0.2726789 0.14273055 -0.12745777 0.3890805 0.10666412
4.14528409 -0.92145383 1.01055841 -0.15387421 0.24767861 -0.10193787
-0.11728571 0.05964624 0.27116302 -0.07226598 0.10390643 0.01430673
0.20764521 0.78352865]
I would like to run this on a cluster with a larger dataset than I can make locally. Does anyone have a suggestion to replace the following line with pure numpy code (that I could then rewrite with dask.array)
X, y = datasets.make_classification(n_classes=2, n_samples=1000)
Here is a notebook of things running on a cluster. I artificially increased the dataset size by replicating it. I suspect that this may not be the best way to do this. Still, it's interesting to see the performance difference.
https://gist.github.com/mrocklin/cdd04293859764c542ccf547f81ab7a5
Also, I had to copy the gradient array coming back for some reason. Fortran was complaining about one of the inout parameters not being properly ordered. (which is odd given that it's single-dimensional).
@mrocklin you could try:
def make_classification(n_samples=1000, n_features=100, n_informative=2, scale=1.0):
X = np.random.normal(0, 1, size=(n_samples, n_features))
informative_idx = np.random.choice(n_features, n_informative)
beta = (np.random.random(n_features) - 1) * scale
z0 = X[:, informative_idx].dot(beta[informative_idx])
y = np.random.random(z0.shape) < 1 / (1 + np.exp(-z0))
return X, y
See notebook here: https://gist.github.com/mrocklin/4e486064882cce630ffb4ee4e39bc333
Just to make sure I understand what's going on here. In between the client and cluster are we moving a vector of length the number of columns or a vector of length the number of rows?
Looking at the code it looks like we're only moving data of the size the number of columns.
Should be (1) if correct.
@MLnick do the above timings seem sensible to you?
What is the cluster spec you used?
Eight m4.2xlarges. Seven held dask workers. Each dask worker was a single process with a thread pool of eight threads.
On Thu, Apr 6, 2017 at 10:29 AM, Nick Pentreath notifications@github.com wrote:
What is the cluster spec you used?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-glm/issues/30#issuecomment-292192250, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszLRjwm7e0Qn7gmBwmYJTex8kgj8qks5rtPbngaJpZM4MLZ7D .
I found that about a third of the time was spent computing. A third communicating data, and a third in graph construction overhead (which we should eventually be able to eliminate, given that it's exactly the same graph every time, or by using larger block sizes.)
On Thu, Apr 6, 2017 at 10:43 AM, Matthew Rocklin mrocklin@continuum.io wrote:
Eight m4.2xlarges. Seven held dask workers. Each dask worker was a single process with a thread pool of eight threads.
On Thu, Apr 6, 2017 at 10:29 AM, Nick Pentreath notifications@github.com wrote:
What is the cluster spec you used?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-glm/issues/30#issuecomment-292192250, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszLRjwm7e0Qn7gmBwmYJTex8kgj8qks5rtPbngaJpZM4MLZ7D .
Cool. I have a different cluster setup (4x worker nodes with 48 cores and 120GB RAM each).
I don't have Dask running but did a quick test of Spark's LR on same data size (100,000,000 samples, 10 dimensions, 2 classes).
I used 64 total cores and 256GB total RAM to try match up more closely.
Spark runs in around 13/14 secs. So more or less what I might expect, Dask should be a little quicker as its lighter weight and is I guess using more blocked ops on the linear algebra side as well as native BLAS (my cluster native BLAS is not linking correctly in JVM currently, painful).
Yeah, similarly I would expect that Spark is better at aligning partitions in X and y and that the graph construction overhead is less. Presumably each project has something to learn from the other.
Are you able to post the code you used to run the experiment?
Also, this project is a bit janky, but you might find it interesting: https://github.com/mrocklin/dask-spark
The HIGGS data set that was used in the XGBoost paper benchmarks has 11000000 rows and 28 attributes. The file HIGGS.csv.gz is 2.6 GB large and is available here 'physical sciences' -> HIGGS http://archive.ics.uci.edu/ml/datasets.html
The XGBoost paper in figure 9 in section 4.2 of cache-aware access reveals that there was a significant slow down for time per tree versus number of threads when the block size was 2^24 versus block sizes {2^12, 2^16, 2^24} https://arxiv.org/pdf/1603.02754.pdf
Tensorflow used the Criteo 1TB click data set for benchmarks this February. You may or may not like the terms, but it would be interesting to see how long dask-glm BFGS takes. Criteo 1TB click data set http://labs.criteo.com/2013/12/download-terabyte-click-logs/ Tensorflow benchmarks using Criteo data set ( linear model 70 minutes ) https://cloud.google.com/blog/big-data/2017/02/using-google-cloud-machine-learning-to-predict-clicks-at-scale
I have the Criteo 1TB dataset on my cluster. I'll have to double check but I think I have part of it pre-processed into Parquet files of feature vector and label.
The total feature size is order 800,000,000 so pretty large. I've found Spark LR tends to fall down around even 20-30 million dimension. If I find some time I will try to run dask-glm on a few different feature sizes (For other experiments I've been varying it using feature hashing to set the size of the feature vector).
How is it laid out within Parquet?
My guess is that we would want to manage this as blocked scipy.sparse arrays. I could probably work up a sparse implementation shortly that supported the operations necessary for compute_logistic_loss_grad
.
It's laid out as a "label" column (double) and a "vector" column (sparse vector of doubles, basically a SparkSQL / Parquet struct).
Spark LR works row-wise - maps an aggregation function over each (label, vector) row and aggregates the gradient by summing. This happens per partition, then the gradient vector for each partition is reduced (again summed) to one gradient vector back on the driver.
I'm not sure if dask arrays support sparse? Seems not? But that would be required for data like Criteo which would end up being one-hot-encoded or hashed categorical features with very high cardinality.
What is the encoding of the sparse vector? Is this part of the parquet standard or something specific to Spark?
You're correct that dask.arrays don't support sparse, although I added them in a branch this morning: https://github.com/dask/dask/pull/2179
Really though, it would probably be more straightforward to do what you describe using dask.delayed.
MLNick do you have any interest in contributing a PR with a genericized version of your bfgs solution to dask_glm/algorithms.py
? I think that it would be useful to have.
Sorry for the delay - yes I will try to work something up soon.
Kept.
Currently, the implementation of
bfgs
isn't always converging, and the implementation is messy. Should we spend some time working on this, or toss it? If we're going to keep it, what exactly is going on with the implementation?