tresata / spark-sorted

Secondary sort and streaming reduce for Apache Spark
Apache License 2.0
78 stars 17 forks source link

Add reduceLeftByKey method #6

Closed alno closed 9 years ago

alno commented 9 years ago

Hi!

This PR implements reduceLeftByKey method which is similar to foldLeftByKey but doesn't use have value.

Though it's very simple in implementation it's usage a lot more straightforward than mapStreamByKey(iter => Iterator(iter.reduceLeft(f))).

koertkuipers commented 9 years ago

hey alno, thanks for this! what would be the motivation for using Groupsorted.reduceLeftByKey instead of PairRDDFunctions.reduceByKey?

alno commented 9 years ago

Sometimes operation is non-commutative and so order matters. For example, suppose we have something like:

case class Update(recordId: Long, lastUpdateTime: Date, someFieldUpdate: Option[Int], otherFieldUpdate: Option[String], ... etc ...)

We want to merge consecutive updates of each record so: Update(11, yesteday, Some(11), Some("old")) + Update(11, today, Some(12), None) become Update(11, today, Some(12), Some("old"))

When merging we should sort each group values by time, otherwise old updates may come after new and their values may override new ones.

koertkuipers commented 9 years ago

Thanks that makes sense.

Should it be:

def reduceLeftByKey[W >: V](f: (W, V) => W): RDD[(K, W)]

That would allow operations similar to this (contrived) example using List.reduceLeft:

scala> val x = List(1,2,3)
x: List[Int] = List(1, 2, 3)
scala> x.reduceLeft((a: Any, b: Int) => a.toString + b.toString)
res2: Any = 123
alno commented 9 years ago

Yes, you're right. Updated branch.

koertkuipers commented 9 years ago

merged. thanks again