Closed giwa closed 9 years ago
JIRA descriptiion
A simple Python wrapper and doctests needs to be written for Association Rules. The relevant method is FPGrowthModel.generateAssociationRules. The code will likely live in fpm.py
Scala API JIRA https://issues.apache.org/jira/browse/SPARK-8877
https://github.com/apache/spark/pull/7271
+class FPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) extends Serializable {
+ /**
+ * Generates association rules for the [[Item]]s in [[freqItemsets]].
+ * @param confidence minimal confidence of the rules produced
+ */
+ def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
+ val associationRules = new AssociationRules(confidence)
+ associationRules.run(freqItemsets)
+ }
+}
Test
+ test("FP-Growth String type association rule generation") {
+ val transactions = Seq(
+ "r z h k p",
+ "z y x w v u t s",
+ "s x o n r",
+ "x z y m t s q e",
+ "z",
+ "x z y r q t p")
+ .map(_.split(" "))
+ val rdd = sc.parallelize(transactions, 2).cache()
+
+ /* Verify results using the `R` code:
+ transactions = as(sapply(
+ list("r z h k p",
+ "z y x w v u t s",
+ "s x o n r",
+ "x z y m t s q e",
+ "z",
+ "x z y r q t p"),
+ FUN=function(x) strsplit(x," ",fixed=TRUE)),
+ "transactions")
+ ars = apriori(transactions,
+ parameter = list(support = 0.0, confidence = 0.5, target="rules", minlen=2))
+ arsDF = as(ars, "data.frame")
+ arsDF$support = arsDF$support * length(transactions)
+ names(arsDF)[names(arsDF) == "support"] = "freq"
+ > nrow(arsDF)
+ [1] 23
+ > sum(arsDF$confidence == 1)
+ [1] 23
+ */
+ val rules = (new FPGrowth())
+ .setMinSupport(0.5)
+ .setNumPartitions(2)
+ .run(rdd)
+ .generateAssociationRules(0.9)
+ .collect()
+
+ assert(rules.size === 23)
+ assert(rules.count(rule => math.abs(rule.confidence - 1.0D) < 1e-6) == 23)
+ }
+
objective Wrap AssociationRules by Python
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.fpm
import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
import org.apache.spark.rdd.RDD
/**
* :: Experimental ::
*
* Generates association rules from a [[RDD[FreqItemset[Item]]]. This method only generates
* association rules which have a single item as the consequent.
*/
@Experimental
class AssociationRules private[fpm] (
private var minConfidence: Double) extends Logging with Serializable {
/**
* Constructs a default instance with default parameters {minConfidence = 0.8}.
*/
def this() = this(0.8)
/**
* Sets the minimal confidence (default: `0.8`).
*/
def setMinConfidence(minConfidence: Double): this.type = {
require(minConfidence >= 0.0 && minConfidence <= 1.0)
this.minConfidence = minConfidence
this
}
/**
* Computes the association rules with confidence above [[minConfidence]].
* @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
* @return a [[Set[Rule[Item]]] containing the assocation rules.
*/
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = {
// For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
val candidates = freqItemsets.flatMap { itemset =>
val items = itemset.items
items.flatMap { item =>
items.partition(_ == item) match {
case (consequent, antecedent) if !antecedent.isEmpty =>
Some((antecedent.toSeq, (consequent.toSeq, itemset.freq)))
case _ => None
}
}
}
// Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
.map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent)
}.filter(_.confidence >= minConfidence)
}
def run[Item](freqItemsets: JavaRDD[FreqItemset[Item]]): JavaRDD[Rule[Item]] = {
val tag = fakeClassTag[Item]
run(freqItemsets.rdd)(tag)
}
}
object AssociationRules {
/**
* :: Experimental ::
*
* An association rule between sets of items.
* @param antecedent hypotheses of the rule
* @param consequent conclusion of the rule
* @tparam Item item type
*/
@Experimental
class Rule[Item] private[fpm] (
val antecedent: Array[Item],
val consequent: Array[Item],
freqUnion: Double,
freqAntecedent: Double) extends Serializable {
def confidence: Double = freqUnion.toDouble / freqAntecedent
require(antecedent.toSet.intersect(consequent.toSet).isEmpty, {
val sharedItems = antecedent.toSet.intersect(consequent.toSet)
s"A valid association rule must have disjoint antecedent and " +
s"consequent but ${sharedItems} is present in both."
})
}
}
A simple Python wrapper and doctests needs to be written for Association Rules. The relevant method is FPGrowthModel.generateAssociationRules. The code will likely live in fpm.py
code is here
/Users/ken/spark/master/spark/python/pyspark/mllib
Python API should be something like this.
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy
from numpy import array
from collections import namedtuple
from pyspark import SparkContext
from pyspark.rdd import ignore_unicode_prefix
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
__all__ = ['FPGrowth', 'FPGrowthModel']
@inherit_doc
@ignore_unicode_prefix
class FPGrowthModel(JavaModelWrapper):
"""
.. note:: Experimental
A FP-Growth model for mining frequent itemsets
using the Parallel FP-Growth algorithm.
>>> data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
>>> rdd = sc.parallelize(data, 2)
>>> model = FPGrowth.train(rdd, 0.6, 2)
>>> sorted(model.freqItemsets().collect())
[FreqItemset(items=[u'a'], freq=4), FreqItemset(items=[u'c'], freq=3), ...
"""
def freqItemsets(self):
"""
Returns the frequent itemsets of this model.
"""
return self.call("getFreqItemsets").map(lambda x: (FPGrowth.FreqItemset(x[0], x[1])))
def generateAssociationRules(self, confidence):
"""
Generates association rules for the [[Item]]s in [[freqItemsets]].
:param confidence: minimal confidence of the rules produced
"""
# call Association Rules wrapper here
associationRules = AssocationRules(confidence)
return associationRules.run(self.freqItemsets)
JavaModelWapper
class JavaModelWrapper(object):
"""
Wrapper for the model in JVM
"""
def __init__(self, java_model):
self._sc = SparkContext._active_spark_context
self._java_model = java_model
def __del__(self):
self._sc._gateway.detach(self._java_model)
def call(self, name, *a):
"""Call method of java_model"""
return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
How gateway detach is working ? gateway detach
This function should be in py4j.
https://github.com/bartdag/py4j/blob/master/py4j-python/src/py4j/java_gateway.py
def _detach(self):
_garbage_collect_object(self._gateway_client, self._target_id)
http://py4j.sourceforge.net/py4j_java_gateway.html
detach(java_object) Makes the Java Gateway dereference this object.
The equivalent of this method is called when a JavaObject instance is garbage collected on the Python side. This method, or gc.collect() should still be invoked when memory is limited or when too many objects are created on the Java side.
Parameters: java_object – The JavaObject instance to dereference (free) on the Java side.
https://github.com/apache/spark/blob/master/python/pyspark/mllib/common.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
if sys.version >= '3':
long = int
unicode = str
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
from py4j.java_collections import ListConverter, JavaArray, JavaList
from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql import DataFrame, SQLContext
# Hack for support float('inf') in Py4j
_old_smart_decode = py4j.protocol.smart_decode
_float_str_mapping = {
'nan': 'NaN',
'inf': 'Infinity',
'-inf': '-Infinity',
}
def _new_smart_decode(obj):
if isinstance(obj, float):
s = str(obj)
return _float_str_mapping.get(s, s)
return _old_smart_decode(obj)
py4j.protocol.smart_decode = _new_smart_decode
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
# this will call the MLlib version of pythonToJava()
def _to_java_object_rdd(rdd):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
pass
else:
data = bytearray(PickleSerializer().dumps(obj))
obj = sc._jvm.SerDe.loads(data)
return obj
def _java2py(sc, r, encoding="bytes"):
if isinstance(r, JavaObject):
clsName = r.getClass().getSimpleName()
# convert RDD into JavaRDD
if clsName != 'JavaRDD' and clsName.endswith("RDD"):
r = r.toJavaRDD()
clsName = 'JavaRDD'
if clsName == 'JavaRDD':
jrdd = sc._jvm.SerDe.javaToPython(r)
return RDD(jrdd, sc)
if clsName == 'DataFrame':
return DataFrame(r, SQLContext(sc))
if clsName in _picklable_classes:
r = sc._jvm.SerDe.dumps(r)
elif isinstance(r, (JavaArray, JavaList)):
try:
r = sc._jvm.SerDe.dumps(r)
except Py4JJavaError:
pass # not pickable
if isinstance(r, (bytearray, bytes)):
r = PickleSerializer().loads(bytes(r), encoding=encoding)
return r
def callJavaFunc(sc, func, *args):
""" Call Java Function """
args = [_py2java(sc, a) for a in args]
return _java2py(sc, func(*args))
def callMLlibFunc(name, *args):
""" Call API in PythonMLLibAPI """
sc = SparkContext._active_spark_context
api = getattr(sc._jvm.PythonMLLibAPI(), name)
return callJavaFunc(sc, api, *args)
class JavaModelWrapper(object):
"""
Wrapper for the model in JVM
"""
def __init__(self, java_model):
self._sc = SparkContext._active_spark_context
self._java_model = java_model
def __del__(self):
self._sc._gateway.detach(self._java_model)
def call(self, name, *a):
"""Call method of java_model"""
return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
def inherit_doc(cls):
"""
A decorator that makes a class inherit documentation from its parents.
"""
for name, func in vars(cls).items():
# only inherit docstring for public functions
if name.startswith("_"):
continue
if not func.__doc__:
for parent in cls.__bases__:
parent_func = getattr(parent, name, None)
if parent_func and getattr(parent_func, "__doc__", None):
func.__doc__ = parent_func.__doc__
break
return cls
We can refer for wrapping java class, I think
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
if sys.version >= "3":
from io import BytesIO
else:
from StringIO import StringIO
from py4j.java_gateway import Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
from pyspark.streaming import DStream
__all__ = ['FlumeUtils', 'utf8_decoder']
def utf8_decoder(s):
""" Decode the unicode as UTF-8 """
return s and s.decode('utf-8')
class FlumeUtils(object):
@staticmethod
def createStream(ssc, hostname, port,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
enableDecompression=False,
bodyDecoder=utf8_decoder):
"""
Create an input stream that pulls events from Flume.
:param ssc: StreamingContext object
:param hostname: Hostname of the slave machine to which the flume data will be sent
:param port: Port of the slave machine to which the flume data will be sent
:param storageLevel: Storage level to use for storing the received objects
:param enableDecompression: Should netty server decompress input stream
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
try:
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
except Py4JJavaError as e:
if 'ClassNotFoundException' in str(e.java_exception):
FlumeUtils._printErrorMsg(ssc.sparkContext)
raise e
return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
@staticmethod
def createPollingStream(ssc, addresses,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
maxBatchSize=1000,
parallelism=5,
bodyDecoder=utf8_decoder):
"""
Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
This stream will poll the sink for data and will pull events as they are available.
:param ssc: StreamingContext object
:param addresses: List of (host, port)s on which the Spark Sink is running.
:param storageLevel: Storage level to use for storing the received objects
:param maxBatchSize: The maximum number of events to be pulled from the Spark sink
in a single RPC call
:param parallelism: Number of concurrent requests this stream should send to the sink.
Note that having a higher number of requests concurrently being pulled
will result in this stream using more threads
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
hosts = []
ports = []
for (host, port) in addresses:
hosts.append(host)
ports.append(port)
try:
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createPollingStream(
ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
except Py4JJavaError as e:
if 'ClassNotFoundException' in str(e.java_exception):
FlumeUtils._printErrorMsg(ssc.sparkContext)
raise e
return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
@staticmethod
def _toPythonDStream(ssc, jstream, bodyDecoder):
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
def func(event):
headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
headers = {}
strSer = UTF8Deserializer()
for i in range(0, read_int(headersBytes)):
key = strSer.loads(headersBytes)
value = strSer.loads(headersBytes)
headers[key] = value
body = bodyDecoder(event[1])
return (headers, body)
return stream.map(func)
@staticmethod
def _printErrorMsg(sc):
print("""
________________________________________________________________________________________________
Spark Streaming's Flume libraries not found in class path. Try one of the following.
1. Include the Flume library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
Then, include the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
________________________________________________________________________________________________
""" % (sc.version, sc.version))
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from datetime import datetime
import traceback
from pyspark import SparkContext, RDD
class TransformFunction(object):
"""
This class wraps a function RDD[X] -> RDD[Y] that was passed to
DStream.transform(), allowing it to be called from Java via Py4J's
callback server.
Java calls this function with a sequence of JavaRDDs and this function
returns a single JavaRDD pointer back to Java.
"""
_emptyRDD = None
def __init__(self, ctx, func, *deserializers):
self.ctx = ctx
self.func = func
self.deserializers = deserializers
self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
def rdd_wrapper(self, func):
self._rdd_wrapper = func
return self
def call(self, milliseconds, jrdds):
try:
if self.ctx is None:
self.ctx = SparkContext._active_spark_context
if not self.ctx or not self.ctx._jsc:
# stopped
return
# extend deserializers with the first one
sers = self.deserializers
if len(sers) < len(jrdds):
sers += (sers[0],) * (len(jrdds) - len(sers))
rdds = [self._rdd_wrapper(jrdd, self.ctx, ser) if jrdd else None
for jrdd, ser in zip(jrdds, sers)]
t = datetime.fromtimestamp(milliseconds / 1000.0)
r = self.func(t, *rdds)
if r:
return r._jrdd
except Exception:
traceback.print_exc()
def __repr__(self):
return "TransformFunction(%s)" % self.func
class Java:
implements = ['org.apache.spark.streaming.api.python.PythonTransformFunction']
class TransformFunctionSerializer(object):
"""
This class implements a serializer for PythonTransformFunction Java
objects.
This is necessary because the Java PythonTransformFunction objects are
actually Py4J references to Python objects and thus are not directly
serializable. When Java needs to serialize a PythonTransformFunction,
it uses this class to invoke Python, which returns the serialized function
as a byte array.
"""
def __init__(self, ctx, serializer, gateway=None):
self.ctx = ctx
self.serializer = serializer
self.gateway = gateway or self.ctx._gateway
self.gateway.jvm.PythonDStream.registerSerializer(self)
def dumps(self, id):
try:
func = self.gateway.gateway_property.pool[id]
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
except Exception:
traceback.print_exc()
def loads(self, data):
try:
f, deserializers = self.serializer.loads(bytes(data))
return TransformFunction(self.ctx, f, *deserializers)
except Exception:
traceback.print_exc()
def __repr__(self):
return "TransformFunctionSerializer(%s)" % self.serializer
class Java:
implements = ['org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer']
def rddToFileName(prefix, suffix, timestamp):
"""
Return string prefix-time(.suffix)
>>> rddToFileName("spark", None, 12345678910)
'spark-12345678910'
>>> rddToFileName("spark", "tmp", 12345678910)
'spark-12345678910.tmp'
"""
if isinstance(timestamp, datetime):
seconds = time.mktime(timestamp.timetuple())
timestamp = int(seconds * 1000) + timestamp.microsecond // 1000
if suffix is None:
return prefix + "-" + str(timestamp)
else:
return prefix + "-" + str(timestamp) + "." + suffix
if __name__ == "__main__":
import doctest
(failure_count, test_count) = doctest.testmod()
if failure_count:
exit(-1)
Py4j examples
http://py4j.sourceforge.net/py4j_java_gateway.html#examples
4.1.1.1. Examples
Using the jvm property:
>>> gateway = JavaGateway()
>>> jvm = gateway.jvm
>>> l = jvm.java.util.ArrayList()
>>> l.append(10)
>>> l.append(1)
>>> jvm.java.util.Collections.sort(l)
>>> l
[1, 10]
>>> l.append(5)
>>> l.sort()
>>> l
[1, 5, 10]
Using auto_field:
First we declare a class that has a field AND a method called member:
package py4j.examples;
public class ExampleWithField {
public int member = 1;
public String member() {
return "Hello World";
}
}
Then we play with the class using the two possible values of auto_field:
>>> java_gateway = JavaGateway() # auto_field = False
>>> example = java_gateway.jvm.py4j.examples.ExampleWithField()
>>> example.member()
u'Hello World'
>>> get_field(example,'member')
1
>>> java_gateway2 = JavaGateway(auto_field=True)
>>> example2 = java_gateway2.jvm.py4j.examples.ExampleWithField()
>>> example2.member
1
>>> get_method(example2,'member')()
u'Hello World'
Use this fuction since all RDD are JavaRDD from python side
def run[Item](freqItemsets: JavaRDD[FreqItemset[Item]]): JavaRDD[Rule[Item]] = {
val tag = fakeClassTag[Item]
run(freqItemsets.rdd)(tag)
}
Scala code should be here for Python API
Maybe put the code here
package org.apache.spark.mllib.api.python
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.rdd.RDD
/**
* A Wrapper of FPGrowthModel to provide helper method for Python
*/
private[python] class FPGrowthModelWrapper(model: FPGrowthModel[Any])
extends FPGrowthModel(model.freqItemsets) {
def getFreqItemsets: RDD[Array[Any]] = {
SerDe.fromTuple2RDD(model.freqItemsets.map(x => (x.javaItems, x.freq)))
}
}
/**
* Java stub for Python mllib FPGrowth.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
* needs to be taken in the Python code to ensure it gets freed on exit; see
* the Py4J documentation.
*/
def trainFPGrowthModel(
data: JavaRDD[java.lang.Iterable[Any]],
minSupport: Double,
numPartitions: Int): FPGrowthModel[Any] = {
val fpg = new FPGrowth()
.setMinSupport(minSupport)
.setNumPartitions(numPartitions)
val model = fpg.run(data.rdd.map(_.asScala.toArray))
new FPGrowthModelWrapper(model)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.fpm
import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
import org.apache.spark.rdd.RDD
/**
* :: Experimental ::
*
* Generates association rules from a [[RDD[FreqItemset[Item]]]. This method only generates
* association rules which have a single item as the consequent.
*/
@Experimental
class AssociationRules private[fpm] (
private var minConfidence: Double) extends Logging with Serializable {
/**
* Constructs a default instance with default parameters {minConfidence = 0.8}.
*/
def this() = this(0.8)
/**
* Sets the minimal confidence (default: `0.8`).
*/
def setMinConfidence(minConfidence: Double): this.type = {
require(minConfidence >= 0.0 && minConfidence <= 1.0)
this.minConfidence = minConfidence
this
}
/**
* Computes the association rules with confidence above [[minConfidence]].
* @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
* @return a [[Set[Rule[Item]]] containing the assocation rules.
*/
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = {
// For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
val candidates = freqItemsets.flatMap { itemset =>
val items = itemset.items
items.flatMap { item =>
items.partition(_ == item) match {
case (consequent, antecedent) if !antecedent.isEmpty =>
Some((antecedent.toSeq, (consequent.toSeq, itemset.freq)))
case _ => None
}
}
}
// Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
.map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent)
}.filter(_.confidence >= minConfidence)
}
def run[Item](freqItemsets: JavaRDD[FreqItemset[Item]]): JavaRDD[Rule[Item]] = {
val tag = fakeClassTag[Item]
run(freqItemsets.rdd)(tag)
}
}
object AssociationRules {
/**
* :: Experimental ::
*
* An association rule between sets of items.
* @param antecedent hypotheses of the rule
* @param consequent conclusion of the rule
* @tparam Item item type
*/
@Experimental
class Rule[Item] private[fpm] (
val antecedent: Array[Item],
val consequent: Array[Item],
freqUnion: Double,
freqAntecedent: Double) extends Serializable {
def confidence: Double = freqUnion.toDouble / freqAntecedent
require(antecedent.toSet.intersect(consequent.toSet).isEmpty, {
val sharedItems = antecedent.toSet.intersect(consequent.toSet)
s"A valid association rule must have disjoint antecedent and " +
s"consequent but ${sharedItems} is present in both."
})
}
}
Wrap this!!
/**
* :: Experimental ::
*
* Model trained by [[FPGrowth]], which holds frequent itemsets.
* @param freqItemsets frequent itemset, which is an RDD of [[FreqItemset]]
* @tparam Item item type
*/
@Experimental
class FPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) extends Serializable {
/**
* Generates association rules for the [[Item]]s in [[freqItemsets]].
* @param confidence minimal confidence of the rules produced
*/
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
val associationRules = new AssociationRules(confidence)
associationRules.run(freqItemsets)
}
}
load java package
https://github.com/apache/spark/blob/488bad319a70975733e83c83490240a70beb0c90/python/pyspark/java_gateway.py#L116 java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
FPGrowth
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.fpm
import java.{util => ju}
import java.lang.{Iterable => JavaIterable}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.spark.{HashPartitioner, Logging, Partitioner, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.mllib.fpm.FPGrowth._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
/**
* :: Experimental ::
*
* Model trained by [[FPGrowth]], which holds frequent itemsets.
* @param freqItemsets frequent itemset, which is an RDD of [[FreqItemset]]
* @tparam Item item type
*
* @since 1.3.0
*/
@Experimental
class FPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) extends Serializable {
/**
* Generates association rules for the [[Item]]s in [[freqItemsets]].
* @param confidence minimal confidence of the rules produced
* @since 1.5.0
*/
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
val associationRules = new AssociationRules(confidence)
associationRules.run(freqItemsets)
}
}
/**
* :: Experimental ::
*
* A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in
* [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query
* Recommendation]]. PFP distributes computation in such a way that each worker executes an
* independent group of mining tasks. The FP-Growth algorithm is described in
* [[http://dx.doi.org/10.1145/335191.335372 Han et al., Mining frequent patterns without candidate
* generation]].
*
* @param minSupport the minimal support level of the frequent pattern, any pattern appears
* more than (minSupport * size-of-the-dataset) times will be output
* @param numPartitions number of partitions used by parallel FP-growth
*
* @see [[http://en.wikipedia.org/wiki/Association_rule_learning Association rule learning
* (Wikipedia)]]
*
* @since 1.3.0
*/
@Experimental
class FPGrowth private (
private var minSupport: Double,
private var numPartitions: Int) extends Logging with Serializable {
/**
* Constructs a default instance with default parameters {minSupport: `0.3`, numPartitions: same
* as the input data}.
*
* @since 1.3.0
*/
def this() = this(0.3, -1)
/**
* Sets the minimal support level (default: `0.3`).
*
* @since 1.3.0
*/
def setMinSupport(minSupport: Double): this.type = {
this.minSupport = minSupport
this
}
/**
* Sets the number of partitions used by parallel FP-growth (default: same as input data).
*
* @since 1.3.0
*/
def setNumPartitions(numPartitions: Int): this.type = {
this.numPartitions = numPartitions
this
}
/**
* Computes an FP-Growth model that contains frequent itemsets.
* @param data input data set, each element contains a transaction
* @return an [[FPGrowthModel]]
*
* @since 1.3.0
*/
def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = {
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("Input data is not cached.")
}
val count = data.count()
val minCount = math.ceil(minSupport * count).toLong
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
val freqItems = genFreqItems(data, minCount, partitioner)
val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
new FPGrowthModel(freqItemsets)
}
def run[Item, Basket <: JavaIterable[Item]](data: JavaRDD[Basket]): FPGrowthModel[Item] = {
implicit val tag = fakeClassTag[Item]
run(data.rdd.map(_.asScala.toArray))
}
/**
* Generates frequent items by filtering the input data using minimal support level.
* @param minCount minimum count for frequent itemsets
* @param partitioner partitioner used to distribute items
* @return array of frequent pattern ordered by their frequencies
*/
private def genFreqItems[Item: ClassTag](
data: RDD[Array[Item]],
minCount: Long,
partitioner: Partitioner): Array[Item] = {
data.flatMap { t =>
val uniq = t.toSet
if (t.size != uniq.size) {
throw new SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.")
}
t
}.map(v => (v, 1L))
.reduceByKey(partitioner, _ + _)
.filter(_._2 >= minCount)
.collect()
.sortBy(-_._2)
.map(_._1)
}
/**
* Generate frequent itemsets by building FP-Trees, the extraction is done on each partition.
* @param data transactions
* @param minCount minimum count for frequent itemsets
* @param freqItems frequent items
* @param partitioner partitioner used to distribute transactions
* @return an RDD of (frequent itemset, count)
*/
private def genFreqItemsets[Item: ClassTag](
data: RDD[Array[Item]],
minCount: Long,
freqItems: Array[Item],
partitioner: Partitioner): RDD[FreqItemset[Item]] = {
val itemToRank = freqItems.zipWithIndex.toMap
data.flatMap { transaction =>
genCondTransactions(transaction, itemToRank, partitioner)
}.aggregateByKey(new FPTree[Int], partitioner.numPartitions)(
(tree, transaction) => tree.add(transaction, 1L),
(tree1, tree2) => tree1.merge(tree2))
.flatMap { case (part, tree) =>
tree.extract(minCount, x => partitioner.getPartition(x) == part)
}.map { case (ranks, count) =>
new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
}
}
/**
* Generates conditional transactions.
* @param transaction a transaction
* @param itemToRank map from item to their rank
* @param partitioner partitioner used to distribute transactions
* @return a map of (target partition, conditional transaction)
*/
private def genCondTransactions[Item: ClassTag](
transaction: Array[Item],
itemToRank: Map[Item, Int],
partitioner: Partitioner): mutable.Map[Int, Array[Int]] = {
val output = mutable.Map.empty[Int, Array[Int]]
// Filter the basket by frequent items pattern and sort their ranks.
val filtered = transaction.flatMap(itemToRank.get)
ju.Arrays.sort(filtered)
val n = filtered.length
var i = n - 1
while (i >= 0) {
val item = filtered(i)
val part = partitioner.getPartition(item)
if (!output.contains(part)) {
output(part) = filtered.slice(0, i + 1)
}
i -= 1
}
output
}
}
/**
* :: Experimental ::
*
* @since 1.3.0
*/
@Experimental
object FPGrowth {
/**
* Frequent itemset.
* @param items items in this itemset. Java users should call [[FreqItemset#javaItems]] instead.
* @param freq frequency
* @tparam Item item type
*
* @since 1.3.0
*/
class FreqItemset[Item](val items: Array[Item], val freq: Long) extends Serializable {
/**
* Returns items in a Java List.
*
* @since 1.3.0
*/
def javaItems: java.util.List[Item] = {
items.toList.asJava
}
}
}
>>> data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
>>> rdd = sc.parallelize(data, 2)
>>> model = FPGrowth.train(rdd, 0.6, 2)
>>> sorted(model.freqItemsets().collect())
[FreqItemset(items=[u'a'], freq=4), FreqItemset(items=[u'c'], freq=3), ...
https://github.com/apache/spark
JIRA https://issues.apache.org/jira/browse/SPARK-8855?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20priority%20%3D%20Minor%20ORDER%20BY%20key%20DESC
作業ログは、参加者が、このイベントを通して得るものをより増やすために重要になります。なぜなら作業ログがメンターからのフィードバックをより充実させるからです。
作業ログを元にメンターと「ふりかえり」をするタイミングがあります。「ふりかえり」では次のことをします。
メンターは次のようなフィードバックをします。これは、参加者とは違う視点から参加者の行動を観察することおよびメンターの経験があるからできることです。(「ふりかえり」の前にこんなフィードバックをよろしくお願いします!とお願いすると効果が高まるので実践してみましょう。)
このように、参加者の作業をメンター視点で一緒に整理し、参加者の今後の行動に活かす活動がここでいう「ふりかえり」です。そのため、「参加者にとって」ログに残すべきかどうか、という視点ではなく、「とりあえずログに残す」という視点でログを残してください。これは、参加者が重要だと判断しなくてもメンターの視点から見たら大事なこともあるからです。
ログに残すときは次のようなときです。
ログに残すことは次のことです。「備考」以外は作業の邪魔にならないように一言でよいです。備考は作業に役立つので必要な分だけ書いてください。
ログはコメントとして追記していってください。テンプレートは次の通りです。
例1(備考なし):
例2:
必要なファイルが足りないのかなぁ。