|Build Status| |PyPi| |Gitter| |Gitential|
PySpark + Scikit-learn = Sparkit-learn
GitHub: https://github.com/lensacom/sparkit-learn
Sparkit-learn aims to provide scikit-learn functionality and API on PySpark. The main goal of the library is to create an API that stays close to sklearn's.
The driving principle was to "Think locally, execute distributively." To accomodate this concept, the basic data block is always an array or a (sparse) matrix and the operations are executed on block level.
.. code:: bash
PYTHONPATH=${PYTHONPATH}:.. IPYTHON_OPTS="notebook" ${SPARK_HOME}/bin/pyspark --master local\[4\] --driver-memory 2G
.. code:: bash
./runtests.sh
Sparkit-learn introduces three important distributed data format:
ArrayRDD:
A numpy.array like distributed array
.. code:: python
from splearn.rdd import ArrayRDD
data = range(20)
rdd = sc.parallelize(data, 2) # each partition with 10 elements
X = ArrayRDD(rdd, bsize=5) # 4 blocks, 2 in each partition
Basic operations:
.. code:: python
len(X) # 20 - number of elements in the whole dataset X.blocks # 4 - number of blocks X.shape # (20,) - the shape of the whole dataset
X # returns an ArrayRDD
X.dtype # returns the type of the blocks
X.collect() # get the dataset
X[1].collect() # indexing
X[1] # also returns an ArrayRDD!
X[1::2].collect() # slicing
X[1::2] # returns an ArrayRDD as well
X.tolist() # returns the dataset as a list
X.toarray() # returns the dataset as a numpy.array
X.getNumPartitions() # 2 - number of partitions
SparseRDD:
The sparse counterpart of the ArrayRDD, the main difference is that the blocks are sparse matrices. The reason behind this split is to follow the distinction between numpy.ndarrays and scipy.sparse matrices. Usually the SparseRDD is created by splearn's transformators, but one can instantiate too.
.. code:: python
from splearn.rdd import SparseRDD from sklearn.feature_extraction.tests.test_text import ALL_FOOD_DOCS ALL_FOOD_DOCS
X = ArrayRDD(sc.parallelize(ALL_FOOD_DOCS, 4), 2) X.collect()
from splearn.feature_extraction.text import SparkCountVectorizer vect = SparkCountVectorizer() X = vect.fit_transform(X)
X
X.dtype
X[2:4].collect()
X.sum(), X.mean(), X.max(), X.min()
X.sum(axis=1)
X.todense()
X.todense().collect()
sparse = sc.parallelize(np.array([sp.eye(2).tocsr()]*20), 2) sparse = SparseRDD(sparse, bsize=5) sparse
sparse.collect()
DictRDD:
A column based data format, each column with it's own type.
.. code:: python
from splearn.rdd import DictRDD
X = range(20) y = list(range(2)) * 10
X_rdd = sc.parallelize(X, 2) # each partition with 10 elements y_rdd = sc.parallelize(y, 2) # each partition with 10 elements
Z = DictRDD((X_rdd, y_rdd), columns=('X', 'y'), bsize=5, dtype=[np.ndarray, np.ndarray]) # 4 blocks, 2/partition
import numpy as np
data = np.array([range(20), list(range(2))*10]).T rdd = sc.parallelize(data, 2) Z = DictRDD(rdd, columns=('X', 'y'), bsize=5, dtype=[np.ndarray, np.ndarray])
Basic operations:
.. code:: python
len(Z) # 8 - number of blocks Z.columns # returns ('X', 'y') Z.dtype # returns the types in correct order
Z # returns a DictRDD
Z.collect()
Z[:, 'y'] # column select - returns an ArrayRDD Z[:, 'y'].collect()
Z[:-1, ['X', 'y']] # slicing - DictRDD Z[:-1, ['X', 'y']].collect()
With the use of the described data structures, the basic workflow is almost identical to sklearn's.
Distributed vectorizing of texts
SparkCountVectorizer
^^^^^^^^^^^^^^^^^^^^
.. code:: python
from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkCountVectorizer
from sklearn.feature_extraction.text import CountVectorizer
X = [...] # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4)) # sc is SparkContext
local = CountVectorizer()
dist = SparkCountVectorizer()
result_local = local.fit_transform(X)
result_dist = dist.fit_transform(X_rdd) # SparseRDD
SparkHashingVectorizer
^^^^^^^^^^^^^^^^^^^^^^
.. code:: python
from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from sklearn.feature_extraction.text import HashingVectorizer
X = [...] # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4)) # sc is SparkContext
local = HashingVectorizer()
dist = SparkHashingVectorizer()
result_local = local.fit_transform(X)
result_dist = dist.fit_transform(X_rdd) # SparseRDD
SparkTfidfTransformer
^^^^^^^^^^^^^^^^^^^^^
.. code:: python
from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from splearn.feature_extraction.text import SparkTfidfTransformer
from splearn.pipeline import SparkPipeline
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.pipeline import Pipeline
X = [...] # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4)) # sc is SparkContext
local_pipeline = Pipeline((
('vect', HashingVectorizer()),
('tfidf', TfidfTransformer())
))
dist_pipeline = SparkPipeline((
('vect', SparkHashingVectorizer()),
('tfidf', SparkTfidfTransformer())
))
result_local = local_pipeline.fit_transform(X)
result_dist = dist_pipeline.fit_transform(X_rdd) # SparseRDD
Distributed Classifiers
.. code:: python
from splearn.rdd import DictRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from splearn.feature_extraction.text import SparkTfidfTransformer
from splearn.svm import SparkLinearSVC
from splearn.pipeline import SparkPipeline
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
X = [...] # list of texts
y = [...] # list of labels
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parallelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
columns=('X', 'y'),
dtype=[np.ndarray, np.ndarray])
local_pipeline = Pipeline((
('vect', HashingVectorizer()),
('tfidf', TfidfTransformer()),
('clf', LinearSVC())
))
dist_pipeline = SparkPipeline((
('vect', SparkHashingVectorizer()),
('tfidf', SparkTfidfTransformer()),
('clf', SparkLinearSVC())
))
local_pipeline.fit(X, y)
dist_pipeline.fit(Z, clf__classes=np.unique(y))
y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])
Distributed Model Selection
.. code:: python
from splearn.rdd import DictRDD
from splearn.grid_search import SparkGridSearchCV
from splearn.naive_bayes import SparkMultinomialNB
from sklearn.grid_search import GridSearchCV
from sklearn.naive_bayes import MultinomialNB
X = [...]
y = [...]
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parallelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
columns=('X', 'y'),
dtype=[np.ndarray, np.ndarray])
parameters = {'alpha': [0.1, 1, 10]}
fit_params = {'classes': np.unique(y)}
local_estimator = MultinomialNB()
local_grid = GridSearchCV(estimator=local_estimator,
param_grid=parameters)
estimator = SparkMultinomialNB()
grid = SparkGridSearchCV(estimator=estimator,
param_grid=parameters,
fit_params=fit_params)
local_grid.fit(X, y)
grid.fit(Z)
ROADMAP
=======
- [ ] Transparent API to support plain numpy and scipy objects (partially done in the transparent_api branch)
- [ ] Update all dependencies
- [ ] Use Mllib and ML packages more extensively (since it becames more mature)
- [ ] Support Spark DataFrames
Special thanks
==============
- scikit-learn community
- spylearn community
- pyspark community
Similar Projects
===============
- `Thunder <https://github.com/thunder-project/thunder>`_
- `Bolt <https://github.com/bolt-project/bolt>`_
.. |Build Status| image:: https://travis-ci.org/lensacom/sparkit-learn.png?branch=master
:target: https://travis-ci.org/lensacom/sparkit-learn
.. |PyPi| image:: https://img.shields.io/pypi/v/sparkit-learn.svg
:target: https://pypi.python.org/pypi/sparkit-learn
.. |Gitter| image:: https://badges.gitter.im/Join%20Chat.svg
:alt: Join the chat at https://gitter.im/lensacom/sparkit-learn
:target: https://gitter.im/lensacom/sparkit-learn?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge
.. |Gitential| image:: https://api.gitential.com/accounts/6/projects/75/badges/coding-hours.svg
:alt: Gitential Coding Hours
:target: https://gitential.com/accounts/6/projects/75/share?uuid=095e15c5-46b9-4534-a1d4-3b0bf1f33100&utm_source=shield&utm_medium=shield&utm_campaign=75