jwills / driskill

Either[Hotel in Austin, Prototype of a Scala Distributed Collections API]
13 stars 0 forks source link

Shuffle/Grouping #4

Open johnynek opened 11 years ago

johnynek commented 11 years ago

The way I see it there are two traits: DistributedItems[T] and the result of a shuffle: KeyedItems[K, V].

I would have something like:

// no ordering guaranteed
trait DistributedItems[T] {
  def flatMap[U](T => TraversableOnce[U]): DistributedItems[U]
  // stuff here in terms of the above: filter, map, keys, values, etc...

  // TODO take an implicit evidence of serializability of K, T
  def groupBy[K](gfn: T => K)(implicit ord: Ordering[K]): KeyedItems[K, T]
  // TODO, do we handle write at all or let that be a platform detail?
}

trait KeyedItems[K,V] {
  // Hadoop can do this, but more than once doesn't make sense
  def sorted(implicit ord: Ordering[V]): KeyedItems[K, V]
  // Maybe use something other than a stream that doesn't allow the user
  // to hold a ton in memory
  def mapValues[U](fn: (K, Stream[V]) => Stream[U]): KeyedItems[K, U]
  // worth calling out, since this case can be optimized mapside.
  def reduce(implicit cs: CommuativeSemigroup[V]): DistributedItems[(K,V)]
  // fold, scan, etc con be written in terms of mapValues
  def toItems: DistributedItems[(K,V)]
  def cogroup[U](kl: KeyedItems[K,U])(fn: (K, Stream[V], Stream[U]) => Stream[R]): KeyedItems[K, R]
}
jwills commented 11 years ago

So I think of this as the thorniest API difference, and my inclination is to leave it to Suereth to arbitrate it.

Scoobi and Spark both have a single dataset type (DList[T] and RDD[T]), and you can do shuffle-related operations on (K, V) types via an implicit conversion to an extension of DList/RDD that supports those operations. Both Scrunch and Scalding have a distinct type for key-value constructs (PTable/KeyedList) that can be constructed from the distributed single-item type and also converted back to it (implicitly/explicitly) once the shuffle-related operation is over. I think that it's possible to implement either in terms of the other, and that there are pros and cons to each approach and we should work this out on the mailing list once we get it up and running.

With Driskill as a prototype, we're mainly interested in understanding if we can do the simplest possible thing with a common API across every project: count words. I don't mean for anything I write here to be seen as what the API should look like in its final (or even v0) form.

johnynek commented 11 years ago

I'd love to hear more thoughts from the other side.

The type safety of calling out when a shuffle will happen is something I like. In scalding we have a group method that explicitly does the shuffle if the DList[T] for T <:< (K, V). The explicit nature is nice.

blever commented 11 years ago

I have a few comments on this:

Agree with @jwills though - leave up to Suereth to arbitrate. I do think though the thornier issue is how to deal with abstracting the semantics or a group across the two (or more) platforms. MapReduce obviously has the sort-and-shuffle - Scoobi has a Grouping type class that encapsulates this so that you can implement secondary sorts, different partitioners, etc. Scrunch and Scalding likely have similar facilities. However, Spark will likely have different requirements (I say that having little working knowledge of Spark).

jsuereth commented 11 years ago

Hey guys. I'll weigh in a bit here. Sorry, a bit swamped preparing this release for ScalaDays. I should be available to dig in next week once our crunch period is over.

jsuereth commented 11 years ago

Also -> When it comes down to it, I think we may be playing the "How much can Scala infer" game on both sides of this issue.

I believe the issue boils down to a few axes:

Which can Scala infer better?

  1. What return value should we use after mapping/flatMapping/grouping? DList, DMap?
  2. Is the element type of a collection a tuple?

This, IMHO, is the hardest part of Scala API design. Complicate this with the general strategy of tracking lexical information using implicits, or having implicit based type-classes and you can get into a grand ole inference quandry (look at scala's collection lib).

Can I better optimise my runtime if I have semantic knowledge of Key-Value data storage types?

What has the most user friendly interfaces

At least, those are the trade-offs my brain can currently cogitate, given my "ZOMG RELEASE" distraction. Would like to hear everyone's thoughts on these.