tdebatty / spark-knn-graphs

Spark algorithms for building k-nn graphs
MIT License
41 stars 15 forks source link

SparkException: Task not serializable error when operation performed on generated JavaPairRDD<Node<SparseIntegerVector>, NeighborList> #5

Closed mohsinrasool closed 7 years ago

mohsinrasool commented 7 years ago

I am trying to perform some operations on the RDD generated by the KNN Graph classes but I get org.apache.spark.SparkException: Task not serializable error on the same line. I believe, it is because one of the subclass is not serializable.

Anyone have an idea, how can I resolve it, please? My code is below

import info.debatty.java.graphs.Neighbor;
import info.debatty.java.graphs.NeighborList;
import info.debatty.java.graphs.Node;
import info.debatty.java.graphs.SimilarityInterface;
import info.debatty.java.stringsimilarity.KShingling;
import info.debatty.java.utils.SparseIntegerVector;
import info.debatty.spark.knngraphs.builder.DistributedGraphBuilder;
import info.debatty.spark.knngraphs.builder.LSHSuperBitDoubleArray;
import info.debatty.spark.knngraphs.builder.LSHSuperBitSparseIntegerVector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;

import scala.Immutable;
import scala.Tuple2;
import scala.collection.immutable.List;

public class CalculateSimilarityGraph2 {

    private String[] args;

    public CalculateSimilarityGraph2(String[] args) {
        this.args = args;
    }

    /**
     * Create KNN Graph based on similarity measure, input file captured from args[0]
     * @throws Exception
     */
    public void run() throws Exception {

        if(args.length == 0)
            return;

        // Configure spark instance
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Similarity Vector"));

        // Read the file
        ArrayList<String> strings = DistributedGraphBuilder.readFile(this.args[0]);

        // Convert the strings to nodes of SparseIntegerVectors
        // using k-shingling
        // we will split strings in 3-grams (sequences of 3 characters)
        KShingling ks = new KShingling(3);
        ArrayList<Node<SparseIntegerVector>> data =  new ArrayList<Node<SparseIntegerVector>>();
        for (int i = 0; i < strings.size(); i++) {
            String s = strings.get(i);
            s = this.cleanString(s);
            data.add(new Node<SparseIntegerVector>(
                    s,                      // id
                    ks.getProfile(s).getSparseVector()) );   // value
        }

        int dim = ks.getDimension();

        JavaRDD<Node<SparseIntegerVector>> nodes = sc.parallelize(data);

        // Configure LSHSuperBit graph builder
        LSHSuperBitSparseIntegerVector gbuilder = 
                new LSHSuperBitSparseIntegerVector();
        gbuilder.setK(10);
        gbuilder.setStages(2);
        gbuilder.setBuckets(10);
        // LSH hashing requires the dimensionality
        gbuilder.setDim(dim);

        // By default, LSHSuperBit graph builder uses cosine similarity
        // but another similarity measure can be defined if needed...

        // Build the graph...
        JavaPairRDD<Node<SparseIntegerVector>, NeighborList> graph = gbuilder.computeGraph(nodes);        

        /////////////////// org.apache.spark.SparkException: Task not serializable////////////// occur on following Line 
        graph.foreach(new VoidFunction<Tuple2<Node<SparseIntegerVector>, NeighborList>>(){

            private static final long serialVersionUID = 8175395057830217005L;

            @Override
            public void call(Tuple2<Node<SparseIntegerVector>, NeighborList> t)
                    throws Exception {
                // Doing nothing, just testing.

            }

        });

        System.out.println(graph.first());
        System.out.println("Output saved to "+this.args[0].replace('.', '_'));
    }

    /**
     * Cleans string by removing URLs
     * @param s String to be cleaned
     * @return
     */
    private String cleanString(String s) {
        // Remove URL
        return s.replaceAll("https?://\\S+\\s?", "");
    }

    public static void main(String[] args) {

        CalculateSimilarityGraph2 obj = new CalculateSimilarityGraph2(args);
        try {
            obj.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
tdebatty commented 7 years ago

Hi!

You are using an anonymous class: graph.foreach(new VoidFunction<Tuple2<Node, NeighborList>>(){

the anonymous class actually has a hidden reference to the parent class (CalculateSimilarityGraph2). Hence your CalculateSimilarityGraph2 object will also be serialized to be sent to the cluster...

Two solutions:

  1. make CalculateSimilarityGraph2 Serializable (implements Serializable)
  2. refactor you anonymous class to a separate class
mohsinrasool commented 7 years ago

Thanks man, I made the CalculateSimilarityGraph2 serializable and it solved the issue.