twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Test to illustrate https://github.com/twitter/summingbird/issues/671 #672

Open pankajroark opened 8 years ago

pankajroark commented 8 years ago

To illustrate the issue with #671

johnynek commented 8 years ago

we should see if this is also a bug with scalding.

We should have common code that does the naming based only on the Producer graph (and maybe on the items inside the graph that get the names, so we can actually translate the names).

I tried to do this in the past, but it was reverted.

pankajroark commented 8 years ago

I added a few named option tests for scalding including for this case. This bug doesn't seem to affect scalding platform.

pankajroark commented 8 years ago

Added a proposed fix, which is a single word change in StormPlatform's get function. The explanation is a bit complex though and follows: The way node members are stored in SummerNode is initially Summer(IdentityKeyedProducer(FlatMappedProducer....))), IdentityKeyedProducer(FlatMappedProducer....))

but here https://github.com/twitter/summingbird/blob/develop/summingbird-online/src/main/scala/com/twitter/summingbird/planner/OnlinePlan.scala#L230 the order is reversed.

So node.members.last returns IdentityKeyedProducer(FlatMappedProducer....))

When the irrudicible analysis is done this IdentityKeyedProducer doesn't cover the Store used in the summer, and then during the reverse mapping it ends up getting mapped to both flatMapper and summer named nodes.

Thus we end up looking at the options of both flatMapper and summer named nodes and find the flatMapper setting first.

By using node.members.head we use Summer(IdentityKeyedProducer(FlatMappedProducer....))) for look up and correctly find only the summer named node and thus only look up there.

NPraneeth commented 8 years ago

While this fix solves this bug, i think it creates a new one.

Lets suppose we have : Source(_).name("dummySrc").map ( fn ). name ("dummyOpt").sumByKey(_*) Options.set("dummySrc"->SummerParallelism(10)) Which gets converted to : NamedProducer(OptionMappedProducer(NamedProducer(Source))) and the SourceNode will contain nodes : OptionMappedProducer, Source

When you take nodes.members.head -> you will get OptionMappedProuducer's options instead of the SourceProducer's options. So nodes.members.head might work in the summerNode but will be failing in the SourceNode. The same problem the other way around.

pankajroark commented 8 years ago

@NPraneeth good catch.

NPraneeth commented 8 years ago

@pankajroark As the code for the 'get' method is in StormPlatform, we can use case statements to see if it's a SourceNode/SummerNode and try to collect the SourceProducer/SummerProducer and then grab the options corresponding to the producers.( This would be better than grabbing the options for the last node in the members ). And for FlatMapNode we can leave the node.members.last as default. For Example : SummerNode -> Summer(IdentityKey(.... Instead of getting node.member.last ( IdentityKeyProd in this case ), we can identify it to be a SummerNode and grab the options for the SummerProducer instead. Similarly for the SourceNode too.

CLAassistant commented 5 years ago

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.