twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Memory platform can remove side effects #681

Open jnievelt opened 8 years ago

jnievelt commented 8 years ago

This may have impact beyond Memory.

If you setup a job like:

import com.twitter.summingbird.memory.Memory

val source = Memory.toSource(Seq(1,2,3))
val mapped = source.map { x => x -> 1 }
val m = new Memory
val store1 = scala.collection.mutable.Map.empty[Int, Int]
val store2 = scala.collection.mutable.Map.empty[Int, Int]
val tail = mapped.sumByKey(store1).also(mapped.sumByKey(store2))

m.run(m.plan(tail))

You would expect, for example, that store1 and store2 have the same contents. Instead, store1 has the expected contents while store2 is empty.

In Memory#toStream, we are careful to only compare producers on reference equality (Identity, etc.). But prior to that we call the DagOptimizer, which doesn't do the same.

If we make either of the following changes:

val tail = mapped.sumByKey(store1).also(mapped.map(identity).sumByKey(store2))
val store2 = scala.collection.mutable.Map(100 -> 100)

Then we break the Producer equality (by giving it a different dependent Producer, or a different Store), the second sumByKey isn't optimized away, and the execution gives the expected result.

johnynek commented 8 years ago

I guess the store equality is screwing us here. We need each store to be unique, right? Thus can be fixed by making a reference equality wrapper type for stores. On Tue, Aug 9, 2016 at 07:28 Joe Nievelt notifications@github.com wrote:

This may have impact beyond Memory.

If you setup a job like:

import com.twitter.summingbird.memory.Memory val source = Memory.toSource(Seq(1,2,3))val mapped = source.map { x => x -> 1 }val m = new Memoryval store1 = scala.collection.mutable.Map.empty[Int, Int]val store2 = scala.collection.mutable.Map.empty[Int, Int]val tail = mapped.sumByKey(store1).also(mapped.sumByKey(store2))

m.run(m.plan(tail))

You would expect, for example, that store1 and store2 have the same contents. Instead, store1 has the expected contents while store2 is empty.

In Memory#toStream, we are careful to only compare producers on reference equality (Identity, etc.). But prior to that we call the DagOptimizer, which doesn't do the same.

If we make either of the following changes:

val tail = mapped.sumByKey(store1).also(mapped.map(identity).sumByKey(store2))val store2 = scala.collection.mutable.Map(100 -> 100)

Then we break the Producer equality (by giving it a different dependent Producer, or a different Store), the second sumByKey isn't optimized away, and the execution gives the expected result.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/issues/681, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJds2iwEjz6-5GK59xkiYTSz6YWaQ9ks5qeLi0gaJpZM4JgUq1 .