lintool / warcbase

Warcbase is an open-source platform for managing analyzing web archives
http://warcbase.org/
161 stars 47 forks source link

K-Means Clustering #226

Open ianmilligan1 opened 8 years ago

ianmilligan1 commented 8 years ago

As discussed, we're interested in incorporating K-Means clustering into warcbase. Can we take a collection (part of GeoCities, for example, or a smaller Archive-It collection) and separate it into k clusters?

@yb1 has offered to tackle this.

youngbink commented 8 years ago

Pull request (https://github.com/lintool/warcbase/pull/230)

Usage example:

import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/", sc)
  .keepUrlPatterns(Set("http://geocities.com/EnchantedForest/.*".r))

val clusters = ExtractClusters(recs, sc)
             .topNWords("GEO_ENCHANTED_FOREST_TOP_N", sc)
             .computeLDA("GEO_ENCHANTED_FOREST_LDA", sc)
             .saveSampleDocs("GEO_ENCHANTED_FOREST_LDA", sc)

APIs:

ExtractClusters (records: RDD[ArchiveRecord], sc: SparkContext, k: Int = 20, numIterations: Int = 20, minDocThreshold: Int=5)

This does stemming & stopwords removal & applying tf-idf and create k-mean clusters k is # of clusters you want to have, and minDocThreshold is to ignore words that were only used in very few docs.

getSampleDocs(sc: SparkContext, numDocs: Int=10)

This function returns rdd of contents of web which are closest to centroids of clusters. numDocs determines how many documents you want to return for each cluster This function can be used when further operations are desired to be applied on rdd. (e.g extracting data from the content)

saveSampleDocs(output: String, sc: SparkContext, numDocs: Int=10)

This just saves the result of calling getSampleDocs (after repartitioning) Each output file will contain sample docs for the corresponding cluster (part-00000 for cluster 1 and so on)

computeLDA(output: String, sc: SparkContext, numTopics: Int = 3, numWordsPerTopic: Int = 10) It will save result of lda to the folder 'output' numWordsPerTopic defines # of most important words for each topic Each output file will contain LDA result for the corresponding cluster (part-00000 for cluster 1 and so on) Output format. (topic, array of words, weight) e.g (0,WrappedArray(green),0.05061560896291224)

topNWords(output: String, sc: SparkContext, limit: Int = 10) Saves the most frequent words in each cluster. Limit defines # of words to be shown for each cluster.

ianmilligan1 commented 8 years ago

Great! Will test this.

ianmilligan1 commented 8 years ago

I ran

import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.{RecordLoader, ExtractClusters}

val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/", sc)
  .keepUrlPatterns(Set("http://geocities.com/EnchantedForest/.*".r))

val clusters = ExtractClusters(recs, sc)
             .topNWords("GEO_ENCHANTED_FOREST_TOP_N", sc)
             .computeLDA("GEO_ENCHANTED_FOREST_LDA", sc)
             .saveSampleDocs("GEO_ENCHANTED_FOREST_LDA", sc)

Using this command on camalon:

spark-shell --jars ~/git/warcbase/target/warcbase-0.1.0-SNAPSHOT-fatjar.jar --num-executors 75 --executor-cores 5 --executor-memory 20G --driver-memory 10G and really haven't had any luck. Tons and tons of errors.

Did you get lots of verbose errors when running @yb1? Is this a Spark 1.5.1 vs 1.6 error? Just doublechecking.

I will keep it running a bit longer but lots of failures.

youngbink commented 8 years ago

Errors were deserialization errors. It's odd that those errors have occurred because I have seen those errors and fixed it.

I've tried to run it again a couple of times, and it worked. (Except for LDA, which is still running)

Please try again with 1.6.1. Also, in order to avoid memory error, please add "--conf spark.yarn.executor.memoryOverhead=4096" when you start.

Note:

  1. (addition). test for good k. If you'd like to find the desired k for k-means, I added option to test against different k values. It will print out costs (sum of squared distances of points to their nearest center) for each k. Then, k can be picked by using elbow method or other method. (For K means, cost will always decrease as k increase (until k > dataset). But then, there is a point where the decrease in cost slows down quite rapidly. Elbow method is to pick that point for k. For more details, please refer to https://en.wikipedia.org/wiki/Determining_the_number_of_clusters_in_a_data_set) val clusters = ExtractClusters(recs, sc, testK=true, maxK=50, stepK: Int=10, minK: Int=5) It will then compute the cost for k from minK until maxK where k is increase by stepK every time.
  2. computeLDA takes significantly more time than other two. For enchantedForrest dataset with 80 executors, computeLDA took more than 24hours, whereas, creating k clusters, extracting most frequent words or extracting samples that are closest to centroids took less than 2 hours each.
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.{RecordLoader, ExtractClusters}

val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/", sc)
  .keepUrlPatterns(Set("http://geocities.com/EnchantedForest/.*".r))

val clusters = ExtractClusters(recs, sc)
             .topNWords("GEO_ENCHANTED_FOREST_TOP_N", sc)
             .saveSampleDocs("GEO_ENCHANTED_FOREST_SAMPLE", sc)
             .computeLDA("GEO_ENCHANTED_FOREST_LDA", sc)