vieyahn2017 / iBlog

44 stars 0 forks source link

3.23 Spark学习—FPGrowth算法 #262

Closed vieyahn2017 closed 5 years ago

vieyahn2017 commented 5 years ago

Spark的模式挖掘—FPGrowth算法 https://blog.csdn.net/oxuzhenyi/article/details/72850850

vieyahn2017 commented 5 years ago

实验知识点 Scala 基础编程
RDD的基本操作。Transformation和Actions
Spark Mlib的FPGrowth的算法应用

vieyahn2017 commented 5 years ago

关联规则用于表示数据内隐藏的关联性。比如说上面说到的故事。啤酒和尿布湿的关系,买尿布湿的消费者往往也会买啤酒。这就表明了数据之间的某种隐藏联系。但是分开看两者没有任何联系。关联规则算法发展到现在一共有三个算法:FP-Tree算法、Apriori算法、Eclat算法,这三个算法我们主要是讲解FP-Tree算法。FP-Tree是一种不产生候选模式而采用频繁模式增长的方法挖掘频繁模式的算法;此算法只扫描两次,第一次扫描数据是得到频繁项集,第二次扫描是利用支持度过滤掉非频繁项,同时生成FP树。然后后面的模式挖掘就是在这棵树上进行。此算法与Apriori算法最大不同的有两点:不产生候选集,只遍历两次数据,大大提升了效率。

下面我们就来讲讲该算法的实现过程:

假设有这么一批数据:

此处输入图片的描述

第一次扫描数据得到频繁项集合,统计所有商品出现的次数:

此处输入图片的描述

假设最小支持度为3,那调整后的频繁项集合就是:

此处输入图片的描述

第二次遍历数据构建频繁项集合树:

此处输入图片的描述

FPTree建立的规则就是树的根,为根节点,ROOT,定义为NULL,然后将数据里面的每一条数据进行插入节点操作,每个节点包含一个名称,和一个次数的属性,例如上图的a:4,a就是Item中的名称,4代表的是出现的次数。如果插入新事务时,树中已经包含该节点,这不用创建树节点,直接在节点上的次数加一就行。

vieyahn2017 commented 5 years ago

获取数据集

wget http://labfile.oss.aliyuncs.com/courses/815/Groceries.txt
vieyahn2017 commented 5 years ago
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("FPGrowthTest")
val sc = new SparkContext(conf)

// val sc = org.apache.spark.repl.Main.sparkContext  
// spark-shell

val data = sc.textFile("file:////usr/java/Groceries.txt")
// data.take(10).foreach(println)

val dataNoHead = data.filter(line => !line.contains("items"))
// dataNoHead.take(5).foreach(println)

val dataS = dataNoHead.map(line => line.split("\\{"))
val dataGoods = dataS.map(s => s(1).replace("}\"",""))
val fpData = dataGoods.map(_.split(",")).cache
// fpData.take(5).foreach(line => line.foreach(print))

// 实例化FPGrowth并且设置支持度为0.05,不满足该支持度的数据将被去除和分区为3。
val fpGroup = new FPGrowth().setMinSupport(0.05).setNumPartitions(3)

val fpModel = fpGroup.run(fpData)
val freqItems = fpModel.freqItemsets.collect

freqItems.foreach(f=>println("FrequentItem: "+f.items.mkString(",")+" OccurrenceFrequency: "+f.freq))
// FrequentItem:频繁项,OccurrenceFrequency:出现次数

// 我们看到用户ID为3的用户只购买了牛奶(whole milk),我们可以向他推荐什么商品呢。
val userID = 2
val usrList = fpData.take(3)( userID)
var goodsFreq = 0L

for(goods <- freqItems){
  if(goods.items.mkString == usrList.mkString){
    goodsFreq = goods.freq
  }
}
println(GoodsNumber:" + goodsFreq)

// 由于数据量的原因,在这里我们置信度设置为0.1,当商品的置信度大于0.1这个阈值,我们就将其推荐给用户。
// 在推荐过程中需要去除用户已经购买了的商品。

for(f <- freqItems){
  if(f.items.mkString.contains(usrList.mkString) && f.items.size > usrList.size) {
    val conf:Double = f.freq.toDouble / goodsFreq.toDouble
    if(conf >= 0.1) {
      var item = f.items
      for (i <- 0 until usrList.size) {
        item = item.filter(_ != usrList(i)) 
      }
      for (str <- item) {
        println(str+"  ==="+conf)
      }
    }
  }
}
vieyahn2017 commented 5 years ago

//结果如下:

yogurt  ===0.2192598487863112
other vegetables  ===0.29287703939514526
rolls/buns  ===0.2216474333465977

结果就是推荐的商品,后面就是其置信度的值。
我们看看当3号用户买牛奶的时候会给我们推荐什么商品?

Yogurt: 酸乳酪 vegettables: 菜蔬 rolls/buns: 小面包或点心

看到推荐的这些商品,是不是感觉和牛奶很配呢。

vieyahn2017 commented 5 years ago

Groceries.txt
文件预览

"","items"
"1","{citrus fruit,semi-finished bread,margarine,ready soups}"
"2","{tropical fruit,yogurt,coffee}"
"3","{whole milk}"
"4","{pip fruit,yogurt,cream cheese ,meat spreads}"
"5","{other vegetables,whole milk,condensed milk,long life bakery product}"
"6","{whole milk,butter,yogurt,rice,abrasive cleaner}"
"7","{rolls/buns}"
"8","{other vegetables,UHT-milk,rolls/buns,bottled beer,liquor (appetizer)}"
"9","{pot plants}"
"10","{whole milk,cereals}"
"11","{tropical fruit,other vegetables,white bread,bottled water,chocolate}"
"12","{citrus fruit,tropical fruit,whole milk,butter,curd,yogurt,flour,bottled water,dishes}"
"13","{beef}"
"14","{frankfurter,rolls/buns,soda}"
"15","{chicken,tropical fruit}"
"16","{butter,sugar,fruit/vegetable juice,newspapers}"
"17","{fruit/vegetable juice}"
"18","{packaged fruit/vegetables}"
"19","{chocolate}"
"20","{specialty bar}"
"21","{other vegetables}"
"22","{butter milk,pastry}"
"23","{whole milk}"
"24","{tropical fruit,cream cheese ,processed cheese,detergent,newspapers}"
"25","{tropical fruit,root vegetables,other vegetables,frozen dessert,rolls/buns,flour,sweet spreads,salty snack,waffles,candy,bathroom cleaner}"
"26","{bottled water,canned beer}"

"9827","{citrus fruit,herbs,other vegetables,dessert,sugar,shopping bags}"
"9828","{frankfurter,tropical fruit,other vegetables,whole milk,frozen meals,rolls/buns,detergent,napkins,newspapers}"
"9829","{sausage,butter,rolls/buns,pickled vegetables,soda,fruit/vegetable juice,waffles}"
"9830","{tropical fruit,other vegetables,domestic eggs,zwieback,ketchup,soda,dishes}"
"9831","{sausage,chicken,beef,hamburger meat,citrus fruit,grapes,root vegetables,whole milk,butter,whipped/sour cream,flour,coffee,red/blush wine,salty snack,chocolate,hygiene articles,napkins}"
"9832","{cooking chocolate}"
"9833","{chicken,citrus fruit,other vegetables,butter,yogurt,frozen dessert,domestic eggs,rolls/buns,rum,cling film/bags}"
"9834","{semi-finished bread,bottled water,soda,bottled beer}"
"9835","{chicken,tropical fruit,other vegetables,vinegar,shopping bags}"
vieyahn2017 commented 5 years ago

另外搜到一个

基于Hadoop&Spark的关联规则实践

https://github.com/NAMZseng/Spark_FPGrowth

vieyahn2017 commented 5 years ago

【机器学习系列2】FPGrowth算法与spark实现

2017年09月07日 17:22:17 江南小白龙 阅读数:1405 https://blog.csdn.net/zongzhiyuan/article/details/77883641

原理

基础 支持度 支持度是指在所有项集中{X, Y}出现的可能性,即项集中同时含有X和Y的概率: 该指标作为建立强关联规则的第一个门槛,衡量了所考察关联规则在“量”上的多少。

置信度 置信度表示在先决条件X发生的条件下,关联结果Y发生的概率: 这是生成强关联规则的第二个门槛,衡量了所考察的关联规则在“质”上的可靠性。

提升度 提升度表示在含有X的条件下同时含有Y的可能性与没有X这个条件下项集中含有Y的可能性之比,该指标与置信度同样衡量规则的可靠性,可以看作是置信度的一种互补指标。

FPGrowth算法 FP-Growth(频繁模式增长)算法是韩家炜在2000年提出的关联分析算法,它采取如下分治策略:将提供频繁项集的数据库压缩到一棵频繁模式树(FP-Tree),但仍保留项集关联信息

该算法和Apriori算法最大的不同有两点:

第一,不产生候选集,

第二,只需要两次遍历数据库,大大提高了效率。

FP数构造 事务数据库建立(很关键)

注意:

许多项集有公共项,而且出现次数越多的项越可能是公共项,因此按出现次数由多到少的顺序可以节省空间,实现压缩存储。

创建根结点和频繁项目表

加入第一个事务(I2,I1,I5)

加入第二个事务(I2,I4)

加入第三个事务(I2,I3)

以此类推加入第5、6、7、8、9个事务。

加入第九个事务(I2,I1,I3)

FP数挖掘 FpTree建好后,就可以进行频繁项集的挖掘,挖掘算法称为FpGrowth(Frequent Pattern Growth)算法。

挖掘从表头header的最后一个项开始,以此类推。下面以I5、I3为例进行挖掘

对于I5,得到条件模式基:<(I2,I1:1)>、<I2,I1,I3:1>

构造条件FP-tree:

得到I5频繁项集:{{I2,I5:2},{I1,I5:2},{I2,I1,I5:2}}

I5的情况是比较简单的,因为I5对应的条件FP-树是单路径的,I3稍微复杂一点。I3的条件模式基是(I2 I1:2), (I2:2),(I1:2),生成的条件FP-树如下图:

I3的条件FP-树仍然是一个多路径树,首先把模式后缀I3和条件FP-树中的项头表中的每一项取并集,得到一组模式{I2 I3:4, I1 I3:4},但是这一组模式不是后缀为I3的所有模式。还需要递归调用FP-growth,模式后缀为{I1,I3},{I1,I3}的条件模式基为{I2:2},其生成的条件FP-树如下图所示。

在FP_growth中把I2和模式后缀{I1,I3}取并得到模式{I1 I2 I3:2}。 理论上还应该计算一下模式后缀为{I2,I3}的模式集,但是{I2,I3}的条件模式基为空,递归调用结束。最终模式后缀I3的支持度>2的所有模式为:{ I2 I3:4, I1 I3:4, I1 I2 I3:2}。

Spark实现

FPGrowth源码包括:FPGrowth、FPTree两部分。 其中FPGrowth中包括:run方法、genFreqItems方法、genFreqItemsets方法、genCondTransactions方法; FPTree中包括:add方法、merge方法、project方法、getTransactions方法、extract方法。


// run 计算频繁项集
  /**
   * Computes an FP-Growth model that contains frequent itemsets.
   * @param data input data set, each element contains a transaction
   * @return an [[FPGrowthModel]]
   */
  def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = {
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("Input data is not cached.")
    }
    val count = data.count()//计算事务总数
    val minCount = math.ceil(minSupport * count).toLong//计算最小支持度
    val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
    val partitioner = new HashPartitioner(numParts)
    //freqItems计算满足最小支持度的Items项
    val freqItems = genFreqItems(data, minCount, partitioner)
    //freqItemsets计算频繁项集
    val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
    new FPGrowthModel(freqItemsets)
  }
// genFreqItems计算满足最小支持度的Items项(并且排序)
  /**
   * Generates frequent items by filtering the input data using minimal support level.
   * @param minCount minimum count for frequent itemsets
   * @param partitioner partitioner used to distribute items
   * @return array of frequent pattern ordered by their frequencies
   */
  private def genFreqItems[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      partitioner: Partitioner): Array[Item] = {
    data.flatMap { t =>
      val uniq = t.toSet
      if (t.size != uniq.size) {
        thrownew SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.")
      }
      t
    }.map(v => (v, 1L))
      .reduceByKey(partitioner, _ + _)
      .filter(_._2 >= minCount)
      .collect()
      .sortBy(-_._2)
      .map(_._1)
    }//统计每个Items项的频次,对小于minCount的Items项过滤,返回Items项。
// genFreqItemsets计算频繁项集:生成FP-Trees,挖掘FP-Trees
  /**
   * Generate frequent itemsets by building FP-Trees, the extraction is done on each partition.
   * @param data transactions
   * @param minCount minimum count for frequent itemsets
   * @param freqItems frequent items
   * @param partitioner partitioner used to distribute transactions
   * @return an RDD of (frequent itemset, count)
   */
  private def genFreqItemsets[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      freqItems: Array[Item],
      partitioner: Partitioner): RDD[FreqItemset[Item]] = {
    val itemToRank = freqItems.zipWithIndex.toMap//表头
    data.flatMap { transaction =>
      genCondTransactions(transaction, itemToRank, partitioner)
    }.aggregateByKey(new FPTree[Int], partitioner.numPartitions)( //生成FP树
      (tree, transaction) => tree.add(transaction, 1L), //FP树增加一条事务
      (tree1, tree2) => tree1.merge(tree2)) //FP树合并
    .flatMap { case (part, tree) =>
      tree.extract(minCount, x => partitioner.getPartition(x) == part)//FP树挖掘频繁项
    }.map { case (ranks, count) =>
      new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
    }
  }
// add FP-Trees增加一条事务数据
  /** Adds a transaction with count. */
  def add(t: Iterable[T], count: Long = 1L): this.type = {
    require(count > 0)
    var curr = root
    curr.count += count
    t.foreach { item =>
      val summary = summaries.getOrElseUpdate(item, new Summary)
      summary.count += count
      val child = curr.children.getOrElseUpdate(item, {
        val newNode = new Node(curr)
        newNode.item = item
        summary.nodes += newNode
        newNode
      })
      child.count += count
      curr = child
    }
    this
  }
// merge FP-Trees合并
  /** Merges another FP-Tree. */
  def merge(other: FPTree[T]): this.type = {
    other.transactions.foreach { case (t, c) =>
      add(t, c)
    }
    this
  }
// extract FP-Trees挖掘,返回所有频繁项集
  /** Extracts all patterns with valid suffix and minimum count. */
  def extract(
      minCount: Long,
      validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = {
    summaries.iterator.flatMap { case (item, summary) =>
      if (validateSuffix(item) && summary.count >= minCount) {
        Iterator.single((item :: Nil, summary.count)) ++
          project(item).extract(minCount).map { case (t, c) =>
            (item :: t, c)
          }
      } else {
        Iterator.empty
      }
    }
  }
vieyahn2017 commented 5 years ago

源码 https://github.com/apache/spark/blob/25bcf59b3b566b77bfc8a40a4f4253b81f340aa4/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala