twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

MemoryPlatform with unneeded .also can produce double writes #631

Closed johnynek closed 8 years ago

johnynek commented 8 years ago

In a case that looks like:

val b = a.map(fn).sumByKey(store1)
val c = b.map(gn).sumByKey(store2)
b.also(c)

the also is not needed, since c already depends on b. It seems having it there is causing double counting.

Gabriella439 commented 8 years ago

Choosing fn = gn = identity and store2 = NullStore for simplicity, my understanding of this issue is that this pipeline:

(a.sumByKey(store1)).also(a.sumByKey(store1))

... will write out to store1 twice. Is that correct?

johnynek commented 8 years ago

Yes, presumably that will show the bug as well (in the memory producer, not sure if it is a bug in other platforms).

Gabriella439 commented 8 years ago

The reason I asked is that I'm not sure whether or not I consider that behavior a bug. If I read that I code I would expect it to write out to the store twice.

ianoc commented 8 years ago

The contract at the producer layer is to be 'ensure' , i.e. ensure that other producer is included in the graph. you should be able to ensure the same bit of code as many times as you like only just ensuring its planned not that it runs more times

Gabriella439 commented 8 years ago

Oh, I see. Alright, that sounds reasonable

Gabriella439 commented 8 years ago

Actually, I was thinking about this some more and I'm wondering how you can actually enforce that ensure contract given that you can transform producers.

Let me give a simple example:

(a.map(f).sumByKey(store)).also(a.map(g).sumByKey(store))

def f(x: T) = x
def g(x: T) = x

Even though f and g are the same function, how would summingbird know that those are two duplicate pipelines and avoid a double write?

ianoc commented 8 years ago

those are two independent producers, regardless if the functions they are running are the same. Here you've explicitly created two sumByKey producers that are unique. even if they should be structurally equal.

the case that shouldn't break is

val p = a.map(g).sumByKey(store)

a.map(f).sumByKey(store).also(p).also(p).also(p)

Gabriella439 commented 8 years ago

So my concern then is that this breaks the law that:

p.map(identity) = p

... since attaching any function at all changes whether or not the producer is treated as a new independent producer.

johnynek commented 8 years ago

So, one other thing: each store can only appear once in the graph as a sumByKey target.

So, either .also in the above is idempotent, or the plan fails. So, while yes, the functor law p.map(identity) == p is violated, it is only violated for invalid graphs which should not be planned.

Since, we can define Equiv[Producer[P, T]] as the plan/computation is identical for equivalent Producers, in that case, the functor law is obeyed (if we consider all unplannable graphs as equal).

Gabriella439 commented 8 years ago

Does duplicate store detection work even when you can transform the store? For example, what if I do store.mapValues(IdentityBijection)? Is that a new store for summingbird's purposes?

The reason I'm asking about this is that I'd expect this law to hold:

store.mapValues(IdentityBijection) = store

Also, in general defining behavior in terms of intensional equality instead of extensional equality makes me very nervous.

johnynek commented 8 years ago

If we had our time again, maybe .sumByKey would return Plan[Producer[P, (K, (Option[V], V))]] and .write would return Plan[Producer[P, T]] for some monad Plan[T], and the side-effect is captured in that Plan type. But we didn't do that.

So, we need some way to compose two jobs together, and we took as a pattern to copy the seq function in Haskell, which you (@gabriel439) understand better than anyone on this thread.

Given the API we have, which generally works pretty well, it does seem like we have some constraints the type system can't check: namely, what happens if we try to write to the same HDFS locations from two different parts of the job? Answer currently is: don't do that. It would be nice if it didn't compile. It should at least fail at runtime. I'm not 100% sure what will happen in all cases now.

Gabriella439 commented 8 years ago

I'm not referring to issues with tying plan building to order of evaluation nor issues with catching the error at runtime vs. compile time. I am focusing on the problem of detecting duplicate stores at runtime without violating the identity law that I referenced.

I'm even okay with deliberately violating the identity law, but if we choose to do we need to mention this issue pretty prominently somewhere in the Summingbird documentation. I consider any law violation to be a nice focal point for discussing for semantically significant details of Summingbird's implementation that users need to be aware of. In this case the relevant topic for the law violation is that sinks cannot be duplicated.

johnynek commented 8 years ago

closing this until we have a repro. Reopen if there is a real bug.