rchillyard / Majabigwaduce

A framework to allow MapReduce applications to use Akka actors
12 stars 4 forks source link

Codacy Badge CircleCI

Majabigwaduce

Majabigwaduce* (aka Akka/MapReduce) is a framework for implementing map-reduce using Scala and Akka actors. I tried to use only the intrinsic notion of map and reduce. Thus, it is not exactly like Hadoop's map-reduce (although it is similar).

Why would anyone want to do map-reduce using actors? It's a good question. For me, it arose initially because I needed an example of using actors for the class I was teaching on Scala and Big Data. I also wanted to ensure that the students understood the essence of map-reduce rather than some derived version of it. Of course, it turns out that it's a perfect application for actors and indeed demonstrates many of the proper techniques to be used when programming with (Akka) actors.

* Majabigwaduce was the Native American name for the site of the battle of Penobscot Bay, Maine in 1779, see Penobscot Expedition

API

Quick link to the API [TODO: This needs to be fixed]

High-level API

DataDefinition

Majabigwaduce has a high-level API, something like that used by Spark. The classes for this API can be found in the dd package. It is based on the concept of DataDefinition, essentially a lazy, partitionable, map of key-value pairs. DataDefinition is a trait with two concrete sub-classes:

A DataDefinition (in these cases a LazyDD) is normally created with a statement such as:

val dd = DataDefinition(map, partitions)

where map is either a Map[K,V] or a Seq[(K,V)]; or

val dd = DataDefinition(list, f, partitions)

where list is a Seq[V] and where f is a function of type V=>K (the mapper function).

In all cases, partitions represents the desired number of partitions for the data definition, but can be omitted, in which case it will default to 2.

There are three types of transformation function currently supported:

There are three types of "action" function:

An additional type DDContext is used implicitly when calling the apply methods of the DataDefinition object.

For an example of using this higher-level API, please see the Matrix class. Because this class is useful in its own right, it can be found in the main source area under the matrix package.

Functional Map-Reduce (mid-level API)

The classes for this API (and anything lower) can be found in the core package. The set of Master classes (lowest-level API) can be used by applications exactly as described below. However, there is a more convenient, functional form based on the trait MapReduce which is defined thus:

trait MapReduce[T,K2,V2] extends Seq[T] => Future[Map[K2,V2]] {
    def compose[K3,V3](mr: MapReduce[(K2,V2),K3,V3]): MapReduce[T,K3,V3] = MapReduceComposed(this,mr)
    def compose[S>:V2](r: Reduce[V2,S])(implicit executionContext: ExecutionContext): Seq[T]=>Future[S]= { ts => for (v2K2m <- apply(ts); s = r.apply(v2K2m)) yield s }
    def ec: ExecutionContext
}

This trait casts the map-reduce process as a simple function: one which takes a Seq[T] and results in a (future of) Map[K2,V2] where T is either V1 in the case of the first stage of a map-reduce pipeline or (Kn,Vn) in the case of the subsequent (nth) stage. There are four case classes which implement this trait (and which should be specified by the application programmer):

Additionally, there is the MapReduceComposed case class which is created by invoking the compose method. A pipeline of map-reduce stages can thus be composed by using the compose method of MapReduce. Such a pipeline may be (optionally) terminated by composing with a Reduce instance which combines the values of the final Map[Kn,Vn] into a single S value (where S is a super-class of Vn).

Thus, a pipeline in functional form is a closure which captures all the functions, and their parameters which are in scope at the time of defining the pipeline.

See the CountWords example (below).

Lowest-level API

In order for a calculation to be performed in parallel, it is necessary that the complete calculation can be broken up into smaller parts which can each be implemented independently. These parallel calculations are performed in the reduce phase of map-reduce while the map phase is responsible for breaking the work into these independent parts. In order that the results from the reduce phase can be collated and/or aggregated, it is usually convenient for each portion of the calculation to be identified by a unique key (we will call the type of these keys K2). The data required for each portion is typically of many similar elements. We will call the type of these elements W. Thus, the natural intermediate data structure (for the shuffle phase, see below) which results from the map stage and is used as input to the reduce stage is:

Map[K2, Seq[W]]

Thus, the first job of designing an application to use map-reduce is to figure out the types K2 and W. If you are chaining map-reduce operations together, then the input to stage N+1 will be of the same form as the output of stage N. Thus, in general, the input to the map-reduce process is a map of key-value pairs. We call the type of the key K1 and the type of the value V1. So, the input to the map stage is, in general:

Map[K1,V1]

For the first stage, there is usually no appropriate key so instead we pass in a message of the following form (which is more or less equivalent to Map[Unit,V1]):

Seq[V1]

The reduction stage, as we have already seen, starts with information in the form of Map[K2,Seq[W]] and the work is divided up and sent to each of the reducers. Thus, each reducer takes as input (via a message) the following tuple:

(K2,Seq[W])

The result of each reduction is a tuple of the following form:

(K2,V2)

where V2 is the aggregate of all the W elements.

Note that the reason that W is not called V2 and V2 is not called V3 is because W is an internal type. It is not a factor in the incoming or outgoing messages from the Master.

Of course, it's possible that there are insufficient reducers available for each of the keys. The way this project deals with that situation is simply to start sending messages to the available actors again. In general, the so-called shuffle phase which precedes the reduce phase is able to pick and choose how to make the best match between the key value k and a particular reducer. This might be based on locality of data referenced by the values in the sequence. Or some other criterion with a view to load-balancing. However, this project does not currently make any such decisions, so the shuffle phase is really non-existent: messages (one per key) are simply sent out to reducers in sequence.

Low-level Details

Master

The Master (or one its three siblings) is the only class which an application needs to be concerned with. The Master, itself an actor, creates a mapper and a number of reducers as appropriate at startup and destroys them at the end. The input message and the constructor format are slightly different according to which form of the Master (see below) you are employing.

Generally, there are five polymorphic types which describe the definition of Master: K1, V1, K2, W, and V2. Of these, W is not involved in messages going to or from the master--it is internal only. And, again generally, the constructor for the Master takes the following parameters:

where

There are actually four Master types to accommodate different situations. The first map-reduce stage in a pipeline (as mentioned above) does not involve K1. Therefore, two of the master types are of this "first" type. Next, there is a difference between the pure reducers which require that these are treated separately (see section on Reducer below). This creates another pairing of master forms: the "fold" variations. Thus, we have four forms of Master all told:

The "fold" variations require the z parameter, whereas the other variations do not. Thus, the non-"fold" variations require that Z2 be a super-type of W (as required by reduceLeft).

The "first" variations do not require a K1 to be defined (it defaults to Unit) and see below in Mapper for the difference in input message types.

The input message type for the "first" variations is: Seq[V1] while the input message type for the non-"first" variations is Map[K1,V1].

The output message type is always Response[K2,V2]. The Response type is defined thus:

case class Response[K,V](left: Map[K,Throwable], right: Map[K,V]) {
  def size = right.size
}

where K represents K2 and V represents V2. As you can see, the results of applying the reductions are preserved whether they are successes or failures. The right value of the response is the collation of the successful reductions, while the left value represents all the exceptions that were thrown (with their corresponding key).

Note that each of the classes described above also provides in apply method in its companion object which you can use if your mapper function is of the form (K1,V1)=>(K2,W) (that's to say, without the Try).

Mapper

The Mapper class is a sub-class of Actor. In general, the Mapper takes the following polymorphic types: [K1,V1,K2,W].

The constructor takes a function f of type (K1,V1)=>Try[(K2,W)], that's to say it is a function which transforms a (K1,V1) tuple into a Try of (K2,W) tuple.

The incoming message is of the form: KeyValueSeq[K,V] where KeyValueSeq is essentially a wrapper around the input (but in sequence/tuple form) and is defined thus:

case class KeyValueSeq[K, V](m: Seq[(K,V)])

Where, in practice, K=K1 and V=V1. For the first-stage map-reduce processes, K1 is assumed to be Unit. Therefore, you can see the reason for making the input in the form of a wrapper around Seq[(K1,V1)]. If the keys are unique then this is 100% two-way convertible with a Map[K1,V1]. However, since the K1 keys can sometimes be missing entirely, we cannot properly form a Map. A Map can always be represented as Seq[Tuple2], however.

It makes sense that the output from the reducer phase and, ultimately the master, recalls both successful calls to the reducer and failures. This follows from the independent nature of the "reduce" phase. But, what about errors in the mapper phase? If the mapper fails on even one input tuple, the entire mapping process is pretty much trashed. What would be the point of continuing on to do the "reduce" phase after a mapper error? That is indeed the normal way of things: if there are any failures in mapping, the whole mapping fails. The form of (successful) output is Map[K2,Seq[W]] while any failure outputs a Throwable (this is all part of the Future class behavior).

Nevertheless, there is an alternative form of mapper called _MapperForgiving which will return (to the master) both (as a tuple) the successful output and a sequence of Throwable objects. This behavior can be turned on my setting forgiving to true in the configuration.

Reducer

The Reducer class is a sub-class of Actor. In general, the Reducer takes the following polymorphic types: [K2,W,V2].

The constructor takes a function g of type (V2,W)=>V2, that's to say it is a function which recursively combines an accumulator of type V2 with an element of type W, yielding a new value for the accumulator. That's to say, g is passed to the reduceLeft method of Seq.

The incoming message is of the form: Intermediate[K2,W] where Intermediate is essentially a wrapper around the input and is defined thus:

case class Intermediate[K, V](k: K, vs: Seq[V])

Where, in practice, K=K2 and V=W. There is an alternative form of reducer: _ReducerFold. This type is designed for the situation where V2 is not a super-type of W or where there is no natural function to combine a V2 with a W. In this case, we must use the foldLeft method of Seq instead of the reduceLeft method. This takes an additional function z which is able to initialize the accumulator.

Configuration

Configuration is based on Typesafe Config (as is normal with Akka applications). Please see the reference.conf file in the main/resources directory for the list of configurable parameters with their explanations.

Dependencies

The components that are used by this project are:

Testing

There are two directories (under src) for testing: test (unit tests/specifications) and it (integration tests). By default, all tests are in the classpath. The example applications are in the it directory, given that they are not really unit tests. If you wish to suppress the integration tests temporarily, simply un-mark the it/scala as a test source root. Or, you could comment out the appropriate entry (unmanagedSourceDirectories) in build.sbt.

Examples

There are several examples provided (in the "src/it/scala/com/phasmid/majabigwaduce/examples" directory):

CountWords

Here is the CountWords app. It actually uses a "mock" URI rather than the real thing, but of course, it's simple to change it to use real URIs. I have not included the mock URI code (check current code for the most accurate version):

object CountWords {
  def apply(hc: HttpClient, args: Array[String]): Future[Int] = {
    val configRoot = ConfigFactory.load
    implicit val config: Config = configRoot.getConfig("CountWords")
    implicit val system: ActorSystem = ActorSystem(config.getString("name"))
    implicit val timeout: Timeout = getTimeout(config.getString("timeout"))
    implicit val logger: LoggingAdapter = system.log
    import ExecutionContext.Implicits.global    
    val ws = if (args.length > 0) args.toSeq else Seq("http://www.bbc.com/doc1", "http://www.cnn.com/doc2", "http://default/doc3", "http://www.bbc.com/doc2", "http://www.bbc.com/doc3")
    CountWords(hc.getResource).apply(ws)
  }

  def getTimeout(t: String): Timeout = {
    val durationR = """(\d+)\s*(\w+)""".r
    t match {
      case durationR(n, s) => new Timeout(FiniteDuration(n.toLong, s))
      case _ => Timeout(10 seconds)
    }
  }
}

case class CountWords(resourceFunc: String => Resource)(implicit system: ActorSystem, logger: LoggingAdapter, config: Config, timeout: Timeout, ec: ExecutionContext) extends (Seq[String] => Future[Int]) {
  type Strings = Seq[String]
  trait StringsZeros extends Zero[Strings] {
    def zero: Strings = Nil: Strings
  }
  implicit object StringsZeros extends StringsZeros
  trait IntZeros extends Zero[Int] {
    def zero: Int = 0
  }
  implicit object IntZeros extends IntZeros
  override def apply(ws: Strings): Future[Int] = {
    val stage1 = MapReduceFirstFold.create(
        { w: String => val u = resourceFunc(w); logger.debug(s"stage1 map: $w"); (u.getServer, u.getContent) },
        appendString
        )
        (actors, timeout)

    val stage2 = MapReducePipe.create[URI, Strings, URI, Int, Int](
      (w, gs) => w -> (countFields(gs) reduce addInts),
      addInts,
      1
    )
    val stage3 = Reduce[URI, Int, Int](addInts)
    val mr = stage1 & stage2 | stage3
    mr(ws)
  }
  private def countFields(gs: Strings) = for (g <- gs) yield g.split("""\s+""").length
  private def addInts(x: Int, y: Int) = x + y
  private def appendString(a: Strings, v: String) = a :+ v
}

It is a three-stage map-reduce problem, including a final reduce stage.

Stage 1 takes a Seq[String] (representing URIs) and produces a Map[URI,Seq[String]]. The mapper for the first stage returns a tuple of the URI (corresponding to the server for the string), and the content of the resource defined by the string. The reducer simply adds a String to a Seq[String]. There is additionally an init function which creates an empty Seq[String]. The result of the first stage is a map of URI->Seq[String] where the key represents a server, and the value elements are the contents of the documents read from that server.

Stage 2 takes the result of the first stage and produces a Map[URI,Int]. The second stage mapper takes the URI and Seq[String] from the first stage and splits each string on white space, getting the number of words, then returns the sum of the lengths. In practice (if you are using just the three default args), these sequences have only one string each. The second stage reducer simply adds together the results of the mapping phase. The result of stage 2 is a map of URI->Int.

Stage 3 (a terminating stage which produces simply a value) takes the map resulting from stage 2, but simply sums the values (ignoring the keys) to form a grand total. This resulting value of Int is printed using println.

Note that the first stage uses MapReduceFirstFold, the second stage uses MapReducePipe, and the third (terminating) stage uses Reduce.

If the names of variables look a bit odd to you, then see my "ScalaProf" blog: http://scalaprof.blogspot.com/2015/12/naming-of-identifiers.html

WebCrawler

Here is the web crawler example app (of course, you should check the current code for accuracy):

case class WebCrawler(depth: Int)(implicit system: ActorSystem, config: Config, timeout: Timeout, ec: ExecutionContext) extends (Strings => Future[Int]) {

  trait StringsZero$ extends Zero[Strings] {
    def zero: Strings = Nil: Strings
  }
  implicit object StringsZero$ extends StringsZero$

  val stage1: MapReduce[String, URI, Strings] =
      MapReduceFirstFold(getHostAndURI, appendContent)(config, system, timeout)
  val stage2: MapReduce[(URI, Strings), URI, Strings] =
      MapReducePipeFold.create(getLinkStrings, joinWordLists, 1)(config, system, timeout)
  val stage3: Reduce[URI, Strings, Strings] =
      Reduce[URI, Strings, Strings](_ ++ _)
  val crawler: Strings => Future[Strings] = stage1 & stage2 | stage3

  override def apply(ws: Strings): Future[Int] =
      doCrawl(ws, Nil, depth) transform( { n => val z = n.length; system.terminate; z }, { x => system.log.error(x, "Map/reduce error (typically in map function)"); x })

  private def doCrawl(ws: Strings, all: Strings, depth: Int): Future[Strings] =
    if (depth < 0) Future((all ++ ws).distinct)
    else {
      def cleanup(ws: Strings): Strings = ???

      def trim(s: String, p: Char): String = ???

      val (_, out) = ws.partition { u => all.contains(u) }
      for (ws <- crawler(cleanup(out)); gs <- doCrawl(ws.distinct, (all ++ out).distinct, depth - 1)) yield gs
    }

  private def getHostAndURI(w: String): (URI, URI) = ???

  private def appendContent(a: Strings, v: URI): Strings = ???

  private def getLinkStrings(u: URI, gs: Strings): (URI, Strings) = {
    def normalizeURL(w: URI, w2: String) = ???

    def getLinks(u: URI, g: String): Strings = for (
      nsA <- HTMLParser.parse(g) \\ "a";
      nsH <- nsA \ "@href";
      nH <- nsH.head
    ) yield normalizeURL(u, nH.toString)

    (u, (for (g <- gs) yield getLinks(u, g)) reduce (_ ++ _))
  }

  private def joinWordLists(a: Strings, v: Strings) = a ++ v
}

object WebCrawler extends App {
  implicit val config: Config = ConfigFactory.load.getConfig("WebCrawler")
  implicit val system: ActorSystem = ActorSystem(config.getString("name"))
  implicit val timeout: Timeout = getTimeout(config.getString("timeout"))        
  import ExecutionContext.Implicits.global  

  val ws = if (args.length > 0) args.toSeq else Seq(config.getString("start"))
  val crawler = WebCrawler(config.getInt("depth"))
  private val xf = crawler(ws)
  xf foreach (x => println(s"total links: $x"))
  Await.ready(xf, 10.minutes)

  def getTimeout(t: String) = {
    val durationR = """(\d+)\s*(\w+)""".r
    t match {
      case durationR(n, s) => new Timeout(FiniteDuration(n.toLong, s))
      case _ => Timeout(10 seconds)
    }
  }
}

The application is somewhat similar to the CountWords app, but because of the much greater load in reading all the documents at any level of recursion, the first stage performs the actual document reading during its reduce phase. However, it also has three stages.

The three stages combined as a pipeline called crawler are invoked recursively by the method doCrawl.

Because you cannot predict in advance what problems you will run into with badly formed (or non-existent) links, it is better to run this app in forgiving mode. Expect about 250 links to be visited given the default value of ws and depth of 2.

Future enhancements

Revision History