tresata / spark-sorted

Secondary sort and streaming reduce for Apache Spark
Apache License 2.0
78 stars 17 forks source link

Duplicate keys in mapStreamByKey (spark-sorted: 0.3; scala: 2.10.4; spark: 1.3.1) #4

Closed lvsl-deactivated closed 9 years ago

lvsl-deactivated commented 9 years ago

Hi, in the following example mapStreamByKey produces duplicate keys:

val binRDD = sc.binaryFile('file://...')
val pairs: RDD[(String, SomeCaseClass)] = binRDD.flatMap(parseBinaryStream)
val sorted: GroupSorted[(String, SomeCaseClass)] = pairs.groupSort(Some(implicitly[Ordering[SomeCaseClass]]))
val mapped: RDD[(String, AggValues)] = sorted.mapStreamByKey(aggregateSomeCaseClass)
mapper.collect.foreach(o => { /*write o._1 to a text file */ })

When I check the text file I got the following: $ gzcat /tmp/all.txt.gz | wc -l 729109 $ gzcat /tmp/all.txt.gz | sort | uniq | wc -l 690618

But as far as I understand, mapStreamByKey should process one key exactly once. Am I missing something?

koertkuipers commented 9 years ago

hey leonid,

mapStreamByKey takes in an Iterator[V] of values per key, and returns a TraversableOnce[W]. it should process every key exactly once, but it has no guarantee that it produces one result per key.

Can you provide some more detail on aggregateSomeCaseClass? if aggregateSomeCaseClass returns at most one element per key then you should not have duplicates in the output and this could be a bug indeed.

On Tue, Aug 4, 2015 at 4:40 AM, Leonid notifications@github.com wrote:

Hi, in the following example mapStreamByKey produces duplicate keys:

val binRDD = sc.binaryFile('file://...') val pairs: RDD[(String, SomeCaseClass)] = binRDD.flatMap(parseBinaryStream) val sorted: GroupSorted[(String, SomeCaseClass)] = pairs.groupSort(Some(implicitly[Ordering[SomeCaseClass]])) val mapped: RDD[(String, AggValues)] = sorted.mapStreamByKey(aggregateSomeCaseClass) mapper.collect.foreach(o => { /write o._1 to a text file / })

When I check the text file I got the following: $ gzcat /tmp/all.txt.gz | wc -l 729109 $ gzcat /tmp/all.txt.gz | sort | uniq | wc -l 690618

But as far as I understand, mapStreamByKey should process one key exactly once. Am I missing something?

— Reply to this email directly or view it on GitHub https://github.com/tresata/spark-sorted/issues/4.

lvsl-deactivated commented 9 years ago

Hi, thanks for quick response!

The aggregateSomeCaseClass, returns an Iterator[AggregateCaseClass], the function looks something like this:

def aggregateSomeCaseClass(items: Iterator[SomeCaseClass]): Iterator[AggregateCaseClass] = {
  val aggItems: SortedMap[String, AggregateCaseClass] = ...

  aggItems.valuesIterator
}
koertkuipers commented 9 years ago

it looks like you return multiple values per key that way.

in your example the number of returned values per key is the size of aggItems, which can be bigger than 1. each returned value becomes a separate line in the output.

for example if for a given key aggItems has size 5 then you will find 5 lines in the output for that same key.

On Tue, Aug 4, 2015 at 9:55 AM, Leonid notifications@github.com wrote:

Hi, thanks for quick response!

The aggregateSomeCaseClass, returns an Iterator[AggregateCaseClass], the function looks something like this:

def aggregateSomeCaseClass(items: Iterator[SomeCaseClass]): Iterator[AggregateCaseClass] = { val aggItems: SortedMap[String, AggregateCaseClass] = ...

aggItems.valuesIterator }

— Reply to this email directly or view it on GitHub https://github.com/tresata/spark-sorted/issues/4#issuecomment-127621034.

lvsl-deactivated commented 9 years ago

Thanks! As mentioned in #5 the matStreamByKey should be named flatMapStreamByKey, that would help to avoid come confusion.