lensacom / sparkit-learn

PySpark + Scikit-learn = Sparkit-learn
Apache License 2.0
1.15k stars 255 forks source link

For executing SparkRandomForestClassifier how should I create a BlockRDD #73

Open MyPythonGitHub opened 7 years ago

MyPythonGitHub commented 7 years ago

Hi, I am quite new to Sparkit-learn. In order to execute SparkRandomForestClassifier, I need to convert my input dataframe (created as columns retrieved from a Hive table) to Spark BlockRDD. Please can you let me know about how do I do that.

Thanks !

taynaud commented 7 years ago

SparkRandomForestClassifier expect a DictRDD with an "X" ArrayRDD and an "y" ArrayRDD.

An ArrayRDD is an RDD of numpy array, so you have to build a small job to transform your DataFrame to 2 rdd of numpy array.

In general, my models expect a list of dict as input, using custom Pipelines with selectors to extract relevant features.

Thus, my conversion routine looks like:

def mapper(partition):
    as_dict = [r.asDict() for r in partition]
    targets = [d[key] for d in as_dict]
    return np.array([as_dict, targets]).T

new_rdd = dataset.rdd.mapPartitions(mapper)
dict_rdd =  DictRDD(new_rdd,
                                 columns=('X', 'y'),
                                 bsize=bsize,
                                 dtype=[np.ndarray, np.ndarray])

You will need to convert your dataframe to numpy array in mapper differently if you have not the same hypothesis than me.

Be also aware that the number of trees behavior is slightly different than the scikit behavior.

SparkRandomForestClassifier will train distinct random forest with n_trees on each partition and then merge them. Thus if you use n_trees=500 and you have 10 partitions in your dataframe, you'll get a final RandomForest of 500 * 10 trees.

Best,

MyPythonGitHub commented 7 years ago

Thanks Thomas for your help ! I am gradually getting there.

After executing this code (I have replaced 'dataset' with my dataframe name), I am getting the error : NameError: name 'bsize' is not defined

I put bsize = 5 (just a random number, I picked) and then I am getting the error:

NameError: global name 'key' is not defined I know, I am missing something obvious here but if you can please help me to identify where I am going wrong.

Thanks again !

taynaud commented 7 years ago

Hello,

My code was an example, your model really need to fill in my workflow with big SparkPipeline wanting dictionnary as input. You need yo write the code to convert your DataFrame as a rdd of numpy array.

See the quickstart https://github.com/lensacom/sparkit-learn

You can also look at the tests to see how to use it, for instance https://github.com/lensacom/sparkit-learn/blob/master/splearn/ensemble/tests/__init__.py and https://github.com/lensacom/sparkit-learn/blob/master/splearn/utils/testing.py

MyPythonGitHub commented 7 years ago

Hi Thomas, Thanks for your feedback ! I am now facing an error which says : TypeError: float() argument must be a string or a number. Wanted to check with you, whether the model accepts float data and in which case will you please guide me to help resolve this error please

Here is my code:

Creating Spark Dataframe

features_input = sqlContext.sql("select feature1, feature2, label from input_table")

Replacing Nulls with Zeros

features_input = features_input.na.fill(0)

def mapper(partition): as_dict = [r.asDict() for r in partition] targets = [d['label'] for d in as_dict] return np.array([as_dict,targets]).T

f_rdd = features_input.rdd.mapPartitions(mapper)

dict_f_rdd = DictRDD(f_rdd, columns=('X','y'), bsize=3,dtype=[np.ndarray, np.ndarray]) clf = SparkRandomForestClassifier(n_estimators=500, n_jobs=-1) clf.fit(dict_f_rdd)

Here is my error message:

16/12/13 05:50:19 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "build/bdist.linux-x86_64/egg/splearn/ensemble/init.py", line 140, in File "/var/opt/teradata/anaconda4.1.1/anaconda/lib/python2.7/site-packages/sklearn/ensemble/forest.py", line 212, in fit X = check_array(X, dtype=DTYPE, accept_sparse="csc") File "/var/opt/teradata/anaconda4.1.1/anaconda/lib/python2.7/site-packages/sklearn/utils/validation.py", line 373, in check_array array = np.array(array, dtype=dtype, order=order, copy=copy) TypeError: float() argument must be a string or a number

Here is my data:

Data: [(array([ {‘feature1': 0.0, ‘feature2’: 0.0, 'label': 0}, {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}, {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0, 0, 0])), (array([ {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}, {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}, {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0, 0, 0])), (array([ {‘feature1’: 0.0, ‘feature2’: 3.066666666666667, 'label': 0}, {‘feature1’: 13.833333333333334, ‘feature2’: 13.833333333333334, 'label': 0}, {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0, 0, 0])), (array([ {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0]))]

taynaud commented 7 years ago

Do your model expect to fit on an array of dict ?

If not you have to build a rdd containing acceptable input for your model.