twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Performance of List semigroup in state aggregation is slow #689

Closed pankajroark closed 7 years ago

pankajroark commented 7 years ago

Summingbird online uses lists for storing state of each key in cache. Ultimately the value that it sums (at least in SyncSummingQueue) looks like: Map[Key, (Seq[InputState], Value)] Thus doing plus on list(the implementation of seq that summingbird uses in Summer and FinalFlatMap) semigroup is very common and is in the critical path of summingbird online. The performance of this semigroup is very slow because we do semigroup.plus(existingValue, newValue) while boils down to oldList ++ List(newValue)

oldList is typically the bigger one. So the cost of above addition is proportional to the size of bigger list. Thus if we accumulate one value at a time in the list the cost is O(n^2) rather than O(n).

I've seen this show up in YourKit profiles.

Aggregating in batch doesn't help because the top level value being summed is a Map[Key, (List[State], Value)] and GenericMapMonoid doesn't do a sumOption on underlying values in the map when doing sumOption. So ultimately plus gets called on the list of state.

One option is to change the data structure used for storing list of state, we could use a vector for example.

Another possible solution is to somehow provide a different implementation for the List semigroup used for combining state lists. Changing the order should fix it for example. List(newValue) ++ oldList would be fast and for state the order doesn't matter much. We would acknowledge newer tuples first but that shouldn't matter much, afaik nothing relies on acking order. Potentially it could result in tuples timing out but typically timeouts are pretty large for the order in a single batch to matter.

Looking to hear thoughts on this.

johnynek commented 7 years ago

See batched:

https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/Batched.scala

Wrapping with that type will force the use of sumOption and can address the concerns here I think (fixes the O(N^2) for sure). Try and see if that fixes it in your kit. On Mon, Sep 19, 2016 at 11:16 Pankaj Gupta notifications@github.com wrote:

Summingbird online uses lists for storing state of each key in cache. Ultimately the value that it sums (at least in SyncSummingQueue) looks like: Map[Key, (Seq[InputState], Value)] Thus doing plus on list(the implementation of seq that summingbird uses in Summer and FinalFlatMap) semigroup is very common and is in the critical path of summingbird online. The performance of this semigroup is very slow because we do semigroup.plus(existingValue, newValue) while boils down to oldList ++ List(newValue)

oldList is typically the bigger one. So the cost of above addition is proportional to the size of bigger list. Thus if we accumulate one value at a time in the list the cost is O(n^2) rather than O(n).

I've seen this show up in YourKit profiles.

Aggregating in batch doesn't help because the top level value being summed is a Map[Key, (List[State], Value)] and GenericMapMonoid doesn't do a sumOption on underlying values in the map when doing sumOption. So ultimately plus gets called on the list of state.

One option is to change the data structure used for storing list of state, we could use a vector for example.

Another possible solution is to somehow provide a different implementation for the List semigroup used for combining state lists. Changing the order should fix it for example. List(newValue) ++ oldList would be fast and for state the order doesn't matter much. We would acknowledge newer tuples first but that shouldn't matter much, afaik nothing relies on acking order. Potentially it could result in tuples timing out but typically timeouts are pretty large for the order in a single batch to matter.

Looking to hear thoughts on this.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/issues/689, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJdtsKeYTtv-gpNeXUssb3DuKsuQCKks5qrtGLgaJpZM4KAzIv .

johnynek commented 7 years ago

Wait, how is ArrayDeque less memory than Batched?

Do we ever actually have lazily constructed things there? I think we have always materialized all of them anyway. On Fri, Dec 23, 2016 at 20:36 Pankaj Gupta notifications@github.com wrote:

Batched will keep things in memory and thus increase memory footprint. A lot of the jobs are memory constrained, there is not much space available to trade for time. IMO, best would be to use a different data type for Seq[InputState], perhaps java.util.ArrayDeque[InputState] or similar that has fast append.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/issues/689#issuecomment-269071878, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJdjfHfsviPW_ueRB6uqN0l-nAGoLWks5rLL16gaJpZM4KAzIv .

johnynek commented 7 years ago

Isn't this the input state we are keeping? Doesn't that have to stay materialized in memory? On Sat, Dec 24, 2016 at 10:43 P. Oscar Boykin oscar.boykin@gmail.com wrote:

Wait, how is ArrayDeque less memory than Batched?

Do we ever actually have lazily constructed things there? I think we have always materialized all of them anyway. On Fri, Dec 23, 2016 at 20:36 Pankaj Gupta notifications@github.com wrote:

Batched will keep things in memory and thus increase memory footprint. A lot of the jobs are memory constrained, there is not much space available to trade for time. IMO, best would be to use a different data type for Seq[InputState], perhaps java.util.ArrayDeque[InputState] or similar that has fast append.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/issues/689#issuecomment-269071878, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJdjfHfsviPW_ueRB6uqN0l-nAGoLWks5rLL16gaJpZM4KAzIv .

pankajroark commented 7 years ago

Yes, I deleted my comment when I realized the same :). I've used iterator now.

On Sat, Dec 24, 2016 at 12:45 PM P. Oscar Boykin notifications@github.com wrote:

Isn't this the input state we are keeping? Doesn't that have to stay materialized in memory? On Sat, Dec 24, 2016 at 10:43 P. Oscar Boykin oscar.boykin@gmail.com wrote:

Wait, how is ArrayDeque less memory than Batched?

Do we ever actually have lazily constructed things there? I think we have always materialized all of them anyway. On Fri, Dec 23, 2016 at 20:36 Pankaj Gupta notifications@github.com wrote:

Batched will keep things in memory and thus increase memory footprint. A lot of the jobs are memory constrained, there is not much space available to trade for time. IMO, best would be to use a different data type for Seq[InputState], perhaps java.util.ArrayDeque[InputState] or similar that has fast append.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub < https://github.com/twitter/summingbird/issues/689#issuecomment-269071878>, or mute the thread < https://github.com/notifications/unsubscribe-auth/AAEJdjfHfsviPW_ueRB6uqN0l-nAGoLWks5rLL16gaJpZM4KAzIv

.

— You are receiving this because you authored the thread.

Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/issues/689#issuecomment-269099718, or mute the thread https://github.com/notifications/unsubscribe-auth/AAojhjW1S_cUzjUdudNN8BPITd1wXYC7ks5rLYSFgaJpZM4KAzIv .

pankajroark commented 7 years ago

We ended up replacing list with chain to fix this.