twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Memory platform misbehaves for with use of also #645

Closed pankajroark closed 8 years ago

pankajroark commented 8 years ago

Test that fails:

class SbAlsoProducerTest extends TestBase {

  case class Key(i: Int)
  case class Value(v: Int)

  test("Two events for same entity combo should calculate correctly") {
    val platform = new Memory
    val source = Memory.toSource\[Value\](Seq(1, 2).map(Value(_)))
    val store: Memory#Store[Key, Value] = mutable.Map.empty[Key, Value]
    val sink: Memory#Sink[Value] = _ => ()

    implicit val sgrp = new Semigroup[Value] {
      override def plus(l: Value, r: Value): Value = Value(l.v + r.v)
    }

    val summed = source
      .map { v => (Key(v.v), v)}
      .sumByKey(store)
      .map { case (_, (existingEventOpt, currentEvent)) =>
        println(s"existing event $existingEventOpt")
        println(s"current event $currentEvent")
        existingEventOpt.map { existingEvent =>
          sgrp.plus(existingEvent, currentEvent)
        }.getOrElse(currentEvent)
      }

    val write1 = summed.write(sink)
    val write2 = summed.write(sink)
    val job = write1.also(write2)

    platform.run(platform.plan(job))
    assert(1 == store(Key(1)))
    assert(2 == store(Key(2)))
  }
}

I see that the map with print statements get called 4 times instead of two.

johnynek commented 8 years ago

related to #631

johnynek commented 8 years ago

Thanks for finding a case that fails. I'll look into it.