EclairJS / eclairjs-nashorn

JavaScript API for Apache Spark
Apache License 2.0
94 stars 11 forks source link

Poor performance of LAMBDA function with large Datasets #265

Closed billreed63 closed 8 years ago

billreed63 commented 8 years ago

When processing large datasets the performance of LAMBDA functions is 90 times slower than a compared to Java.

Processing the movieLens ratings dataset Full: 22,000,000 ratings and 580,000 tag applications applied to 33,000 movies by 240,000 users. Using the Java program


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.sql.SQLContext;
import java.util.Date;

public class LargeDataset {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("movie recommender").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        long start = new Date().getTime();
        JavaRDD complete_ratings_raw_data = jsc.textFile("examples/data/mllib/ml-latest/ratings.csv");
        String complete_ratings_raw_data_header = complete_ratings_raw_data.take(1).get(0).toString();
        JavaRDD complete_ratings_data = complete_ratings_raw_data.filter(new Function<String, Boolean>() {
            public Boolean call(String line) {

                if (line.equals(complete_ratings_raw_data_header)) {
                    return false;
                } else {
                    return true;
                }
            }
        });

        System.out.println("There are recommendations in the complete dataset:  " + complete_ratings_data.count());
        long end = new Date().getTime();
        long time = end - start;
        System.out.println("Execution time: " + time + " milliseconds");

    }
}

Takes 2492 milliseconds to process 22884377 dataset

Where the JavaScript equivalent code:


    var Tuple = require('eclairjs/Tuple');
    var ALS = require('eclairjs/mllib/recommendation/ALS');
    var Rating = require('eclairjs/mllib/recommendation/Rating');

    /*
     In order to build our recommender model, we will use the complete dataset.

     */
    var complete_ratings_raw_data = sc.textFile("examples/data/mllib/ml-latest/ratings.csv");
    print("complete_ratings_raw_data count  " + complete_ratings_raw_data.count());
    var complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0];
    print("complete_ratings_raw_data_header " + complete_ratings_raw_data_header);
    var start = new Date().getTime();
    var filterHeader = function(line) {
        return line != "userId,movieId,rating,timestamp";
    }
   var complete_ratings_data = complete_ratings_raw_data.filter(function(line) {
            // filters out the header
            return line != "userId,movieId,rating,timestamp"; //complete_ratings_raw_data_header;
        })

    print("There are recommendations in the complete dataset:  " + complete_ratings_data.count());

    var end = new Date().getTime();
    var time = end - start;
    print('Execution time: ' + time + " milliseconds");

Take 93557 milliseconds to process 22884377 datasets.

EclairJS commented 8 years ago

Can we find out where the time is being taken? David   ----- Original message -----From: Bill Reed notifications@github.comTo: EclairJS/eclairjs-nashorn eclairjs-nashorn@noreply.github.comCc:Subject: [EclairJS/eclairjs-nashorn] Poor performance of LAMBDA function with large Datasets (#265)Date: Tue, May 24, 2016 7:41 AM  When processing large datasets the performance of LAMBDA functions is 90 times slower than a compared to Java. Processing the movieLens ratings dataset Full: 22,000,000 ratings and 580,000 tag applications applied to 33,000 movies by 240,000 users. Using the Java program import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.mllib.recommendation.Rating;import org.apache.spark.sql.SQLContext;import java.util.Date;public class LargeDataset {    public static void main(String[] args) {        SparkConf conf = new SparkConf().setAppName("movie recommender").setMaster("local[_]");        JavaSparkContext jsc = new JavaSparkContext(conf);        long start = new Date().getTime();        JavaRDD complete_ratings_raw_data = jsc.textFile("examples/data/mllib/ml-latest/ratings.csv");        String complete_ratings_raw_data_header = complete_ratings_raw_data.take(1).get(0).toString();        JavaRDD complete_ratings_data = complete_ratings_raw_data.filter(new Function<String, Boolean>() {            public Boolean call(String line) {                if (line.equals(complete_ratings_raw_data_header)) {                    return false;                } else {                    return true;                }            }        });        System.out.println("There are recommendations in the complete dataset:  " + complete_ratingsdata.count());        long end = new Date().getTime();        long time = end - start;        System.out.println("Execution time: " + time + " milliseconds");    }}  Takes 2492 milliseconds to process 22884377 dataset Where the JavaScript equivalent code:     var Tuple = require('eclairjs/Tuple');    var ALS = require('eclairjs/mllib/recommendation/ALS');    var Rating = require('eclairjs/mllib/recommendation/Rating');    /     In order to build our recommender model, we will use the complete dataset.     */    var complete_ratings_raw_data = sc.textFile("examples/data/mllib/ml-latest/ratings.csv");    print("complete_ratings_raw_data count  " + complete_ratings_raw_data.count());    var complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0];    print("complete_ratings_raw_data_header " + complete_ratings_raw_data_header);    var start = new Date().getTime();    var filterHeader = function(line) {        return line != "userId,movieId,rating,timestamp";    }   var complete_ratings_data = complete_ratings_raw_data.filter(function(line) {            // filters out the header            return line != "userId,movieId,rating,timestamp"; //complete_ratings_raw_data_header;        })    print("There are recommendations in the complete dataset:  " + complete_ratings_data.count());    var end = new Date().getTime();    var time = end - start;    print('Execution time: ' + time + " milliseconds"); Take 93557 milliseconds to process 22884377 datasets. —You are receiving this because you are subscribed to this thread.Reply to this email directly or view it on GitHub

billreed63 commented 8 years ago

The root cause of the performance issue is that we eval() the LAMBDA function every time it is used example:

import javax.script.Invocable;
import javax.script.ScriptEngine;
import org.apache.commons.lang.ArrayUtils;
import org.apache.spark.api.java.function.Function;

public class JSFunction implements Function {
    private String func = null;
    private Object args[] = null;

    public JSFunction(String func, Object[] o) {
        this.func = func;
        this.args = o;
    }

    @SuppressWarnings({ "null", "unchecked" })
    @Override
    public Object call(Object o) throws Exception {
        ScriptEngine e =  NashornEngineSingleton.getEngine();
        Invocable invocable = (Invocable) e;

        Object params[] = {this.func, o};

        if (this.args != null && this.args.length > 0 ) {
            params = ArrayUtils.addAll(params, this.args);
        }

        Object ret = invocable.invokeFunction("Utils_invoke", params);
        //return Utils.jsToJava(ret);
        return ret;
    }
}

and

function Utils_invoke(func) {
    var fn = eval(func);
    var a = Array.prototype.slice.call(arguments);
    var args = (arguments.length > 1)
        ? a.slice(1).map(function (arg) {
        return Serialize.javaToJs(arg);
        return arg;
    })
        : [];

    var ret = null;
    try {
        ret = Serialize.jsToJava(fn.apply(this, args));
        ret = fn.apply(this, args);
    } catch (err) {
        print("error invoking function");
        print(func);
        print(err);
        throw err;
    }

    return ret;
};

The LAMBDA function is passed into the Java function constructor as a string:

 public JSFunction(String func, Object[] o) {
        this.func = func;
        this.args = o;
    }

The constructor is only called once when setting up to process the Dataset example RDD.filter(LAMBDA) where the Java method call() is called once for each dataset. So ideally we would like to eval() the LAMBDA function once in the constructor of the java function. For example:

 public JSFilterFunction(String func, Object[] o) throws Exception{
        this.fn = engine.eval(func);
        this.args = o;

    }

But if we do this we will get java.lang.RuntimeException: org.apache.spark.SparkException: Task not serializable

So we need to do the next best thing and eval() the LAMBDA only once in the Java functions call method, and save it for future use

    public Object call(Object o) throws Exception{
        ScriptEngine e =  NashornEngineSingleton.getEngine();
        if (this.fn == null) {
            this.fn = e.eval(func);
        }
        Invocable invocable = (Invocable) e;

        Object params[] = {this.fn, o};

        if (this.args != null && this.args.length > 0 ) {
            params = ArrayUtils.addAll(params, this.args);
        }

        Object ret = invocable.invokeFunction("Utils_invoke", params);
        return ret;
    }

We also need to make a change to Utils_invoke(func) to expect func to be a function not a string

function Utils_invoke(func) {
    var fn = func; //eval(func);
    var a = Array.prototype.slice.call(arguments);
    var args = (arguments.length > 1)
        ? a.slice(1).map(function (arg) {
        return Serialize.javaToJs(arg);
        return arg;
    })
        : [];

    var ret = null;
    try {
        ret = Serialize.jsToJava(fn.apply(this, args));
        ret = fn.apply(this, args);
    } catch (err) {
        print("error invoking function");
        print(func);
        print(err);
        throw err;
    }

    return ret;
};

With these changes we see the following numbers from our JavaScript code Take 12430 milliseconds to process 22884377 datasets. While still 6 times slower than the Java version I think this is acceptable for a JavaScript version of the code.

This change will need to be applied to all our JSFunction classes that support JavaScript LAMBDAs

Brian-Burns-Bose commented 8 years ago

This looks good to me.

On top of this, there are lot of situations where we serialize the java version of the arguments to JS only to turn around and serialize back to Java to invoke the native version of the function.

billreed63 commented 8 years ago

Due a little "googeling" I have found this Scala vs. Python for Apache Spark which states "Scala programming language is 10 times faster than Python for data analysis and processing" and "The performance is mediocre when Python programming code is used to make calls to Spark libraries but if there is lot of processing involved than Python code becomes much slower than the Scala equivalent code. " With the moveLen 100M dataset we are at 8 times slower that Java/Scala code.