twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Summingbird doesn't pick correct option when same setting defined on multiple nodes #671

Open pankajroark opened 8 years ago

pankajroark commented 8 years ago

I was trying to apply different MaxWaitingFutures on the flatMapper and summer on a simple source -> flatMap -> sink topology. The setting from flatMapper is getting applied to summer.

I was able to simulate this in a unit test. This test fails when added to TopologyTests: "With same setting on multiple names we use the one for the node" in { val fmNodeName = "flatMapper" val smNodeName = "summer" val p = Storm.source(TraversableSpout(sample[List[Int]])) .flatMap(testFn).name(fmNodeName) .sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName)

val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)),
  smNodeName -> Options().set(SummerParallelism(20)))
val storm = Storm.local(opts)
val stormTopo = storm.plan(p).topology
val bolts = stormTopo.get_bolts

// Tail should use parallelism specified for the summer node
assert(bolts("Tail").get_common.get_parallelism_hint == 20)

}

flatMap node's setting for SummerParallelism overrides that of summer.

pankajroark commented 8 years ago

The test I provided is failing due to some other reason, let me fix the test.

pankajroark commented 8 years ago

Updated the test in original comment.

ianoc commented 8 years ago

Does this test pass or fail? You could open this as a simple unit test in a PR too thats failing.

pankajroark commented 8 years ago

That test fails. Let me create a PR with the failing test.

johnynek commented 8 years ago

honestly, naming and options are a weird, but we always felt like we could not fix them without breaking existing jobs.

We need a clear specification and more tests, I think.

pankajroark commented 8 years ago

@johnynek curios what alternatives to options you considered. I don't think many of our users grok the way application of settings via naming flows through the graph, so I agree options do seem weird to our users. Also agree that changing them without breaking existing jobs would be critical, so it's a tough problem. Still curios to know what your ideal solution would be.

johnynek commented 8 years ago

well, we didn't think of removing options, but they were originally added when summingbird had no fan-out. We wanted a look of a builder:

val myNode = fn(someStuff).name("myNode")

So, that means, that the name applies above, kind of the opposite of how data flows, which is weird. Next, there are nodes that are added by implicit conversions (e.g. IdentityKeyedProducer), so either you special case those, or you have names apply to everything above. But then the fan-out case is strange:

val a = fn(a)
val b = a.map(g).name("b")
val c = a.map(h).name("c")

What names apply to a? It would be nice if each node could have only one name, but it does not seem possible if we keep the builder style.

I always forget how things work, but what should .name("a").name("b") do? Just a? Just b? If it is just b then what if someone passes you something with a name and they expect it to be applied?

A main use-case needs to be the ability to set different options on different nodes. If we can't do that somehow something probably needs to change and we just need to help people migrate.

There is a lot of complex stuff.

What my approach was was this: you consider names to be on "intrinsics" of the graph: sources, sinks, services, stores and functions. That is all there is. The nodes are incidental. Next, you keep the original graph to look up the set of names to be applied to an intrinsic (an option may apply to a physical node property, and many intrinsics may be on the same node). Lastly, we should have a partial ordering on names. This can be done by looking at the transitive parents of a name. If that set is a superset of another, it comes after. otherwise, the two are incomparable. With this, you can make a DAG of names. We could give that DAG of names for an intrinsic to the Option, and the option could decide how to aggregate the values. The default aggregation can be just take the first found value going top-down in the dag.

Anyway. I think step 1 is more tests on what the expectations are, step 2 is making sure we can meet all those expectations.