twitter / summingbird

Streaming MapReduce with Scalding and Storm
https://twitter.com/summingbird
Apache License 2.0
2.14k stars 267 forks source link

Track state in summingbird-online as an Iterator rather than a Seq. #703

Closed pankajroark closed 7 years ago

pankajroark commented 7 years ago

Fix for #689 . This should avoid n^2 compute complexity when summing single element lists of Storm tuples.

johnynek commented 7 years ago

could we use Stream instead? the mutability of an Iterator makes it pretty costly to verify that it is not buggy with a code review. I really hate to have a mutable object on an API unless the performance is really necessitating it?

pankajroark commented 7 years ago

Good idea. Changed to use stream instead of iterator.

pankajroark commented 7 years ago

tbh Iterator is 4x faster than stream in this microbenchmark but both are way faster than List, so stream seems fine: val s = (0 to 10000).toList

@Benchmark def listConcat(): List[Int] = { s ++ List(0) }

@Benchmark def streamConcat(): Stream[Int] = { s.toStream ++ Stream(0) }

@Benchmark def iterConcat(): Iterator[Int] = { s.toIterator ++ Iterator.single(0) }

Results: [info] ToBenchmark.iterConcat thrpt 4 85182247.256 ± 5840222.962 ops/s [info] ToBenchmark.listConcat thrpt 4 9937.918 ± 26498.037 ops/s [info] ToBenchmark.streamConcat thrpt 4 22019587.193 ± 7199199.008 ops/s

johnynek commented 7 years ago

I would bet Batched will be as fast or faster than Iterator. On Sat, Dec 24, 2016 at 18:15 Pankaj Gupta notifications@github.com wrote:

tbh Iterator is 4x faster than stream in this microbenchmark but both are way faster than List, so stream seems fine: val s = (0 to 10000).toList

@Benchmark https://github.com/Benchmark def listConcat(): List[Int] = { s ++ List(0) }

@Benchmark https://github.com/Benchmark def streamConcat(): Stream[Int] = { s.toStream ++ Stream(0) }

@Benchmark https://github.com/Benchmark def iterConcat(): Iterator[Int] = { s.toIterator ++ Iterator.single(0) }

Results: [info] ToBenchmark.iterConcat thrpt 4 85182247.256 ± 5840222.962 ops/s [info] ToBenchmark.listConcat thrpt 4 9937.918 ± 26498.037 ops/s [info] ToBenchmark.streamConcat thrpt 4 22019587.193 ± 7199199.008 ops/s

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/pull/703#issuecomment-269109528, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJdpCK_zBnqzN-dyCdF8YtGW_3fhfxks5rLe3YgaJpZM4LVMH_ .

pankajroark commented 7 years ago

I tried Batched but the code started getting complicated because of https://github.com/twitter/summingbird/blob/develop/summingbird-online/src/main/scala/com/twitter/summingbird/online/executor/AsyncBase.scala#L46

We need to be able to pass an empty Batched here. Batched itself doesn't have a zero value it relies on the contained type. So we either have to use Option[Batched] which wouldn't be good for performance, always having to wrap/unwrap. Or we have to have the monoid of S available in this abstract class, which would require further code changes.

pankajroark commented 7 years ago

Just for comparison iterator and Batched are indeed comparable in perf. I'd really like to use Batched if we could find a simple way or iterator otherwise. Combining input state is in the hot path.: val s = (0 to 10000).toList val bs = Batched.items(s).get val is = s.toIterator val ss = s.toStream

@Benchmark def listConcat(): List[Int] = { s ++ List(0) }

@Benchmark def streamConcat(): Stream[Int] = { ss ++ Stream(0) }

@Benchmark def iterConcat(): Iterator[Int] = { is ++ Iterator.single(0) }

@Benchmark def batchedConcat(): Batched[Int] = { bs.combine(Batched(0)) }

[info] ToBenchmark.batchedConcat thrpt 4 126300963.122 ± 46804561.489 ops/s [info] ToBenchmark.iterConcat thrpt 4 121149476.337 ± 46339829.598 ops/s [info] ToBenchmark.listConcat thrpt 4 9380.640 ± 23140.126 ops/s [info] ToBenchmark.streamConcat thrpt 4 24118157.945 ± 7172767.691 ops/s

pankajroark commented 7 years ago

Any suggestions on the next steps here. I'm ok with Stream, it's still much better than List in this case.

johnynek commented 7 years ago

I have not investigated why the tests are red.

If we can make them green with something faster. I'd be happy to do that.

I don't think using Batched would be too much work. Or we could copy this code or add this dependency:

https://github.com/non/chain/blob/master/src/main/scala/chain/Chain.scala

It is a single file library that is the more general version of Batched (it has empty), it was written by @non who also wrote Batched.

Do we really need an empty Batched? I would imagine that a Monoid[Option[Batched[T]]] would be almost as fast (still faster than Stream).

It is up to you. I think killing the O(N^2) is most important. Losing a constant factor of 4 is probably not a huge deal if you don't want to work on this other stuff.

Not using a mutable data structure is pretty important to me since this code has been worked on by many people now, and it is much easier to make a mistake with mutable APIs.

pankajroark commented 7 years ago

Thanks let me try chained. I agree, constant factor of 4 is insignificant compared to n^2. Tests are failing the Mima check.

On Mon, Jan 2, 2017 at 10:04 AM P. Oscar Boykin notifications@github.com wrote:

I have not investigated why the tests are red.

If we can make them green with something faster. I'd be happy to do that.

I don't think using Batched would be too much work. Or we could copy this code or add this dependency:

https://github.com/non/chain/blob/master/src/main/scala/chain/Chain.scala

It is a single file library that is the more general version of Batched (it has empty), it was written by @non https://github.com/non who also wrote Batched.

Do we really need an empty Batched? I would imagine that a Monoid[Option[Batched[T]]] would be almost as fast (still faster than Stream).

It is up to you. I think killing the O(N^2) is most important. Losing a constant factor of 4 is probably not a huge deal if you don't want to work on this other stuff.

Not using a mutable data structure is pretty important to me since this code has been worked on by many people now, and it is much easier to make a mistake with mutable APIs.

— You are receiving this because you authored the thread.

Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/pull/703#issuecomment-270003369, or mute the thread https://github.com/notifications/unsubscribe-auth/AAojhv2OALNrtkVtF2XM1awk5KsqCOuAks5rOTwWgaJpZM4LVMH_ .

johnynek commented 7 years ago

Can you add the exclusions to so we can keep the tests green? It prints what you need to add to the build to exclude those methods from erroring.

On Mon, Jan 2, 2017 at 9:09 AM Pankaj Gupta notifications@github.com wrote:

Thanks let me try chained. I agree, constant factor of 4 is insignificant compared to n^2. Tests are failing the Mima check.

On Mon, Jan 2, 2017 at 10:04 AM P. Oscar Boykin notifications@github.com wrote:

I have not investigated why the tests are red.

If we can make them green with something faster. I'd be happy to do that.

I don't think using Batched would be too much work. Or we could copy this code or add this dependency:

https://github.com/non/chain/blob/master/src/main/scala/chain/Chain.scala

It is a single file library that is the more general version of Batched (it has empty), it was written by @non https://github.com/non who also wrote Batched.

Do we really need an empty Batched? I would imagine that a Monoid[Option[Batched[T]]] would be almost as fast (still faster than Stream).

It is up to you. I think killing the O(N^2) is most important. Losing a constant factor of 4 is probably not a huge deal if you don't want to work on this other stuff.

Not using a mutable data structure is pretty important to me since this code has been worked on by many people now, and it is much easier to make a mistake with mutable APIs.

— You are receiving this because you authored the thread.

Reply to this email directly, view it on GitHub <https://github.com/twitter/summingbird/pull/703#issuecomment-270003369 , or mute the thread < https://github.com/notifications/unsubscribe-auth/AAojhv2OALNrtkVtF2XM1awk5KsqCOuAks5rOTwWgaJpZM4LVMH_

.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/pull/703#issuecomment-270009697, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEJdsU6cNEO-Wai0iYGYHNXOgmudGDDks5rOUuEgaJpZM4LVMH_ .

pankajroark commented 7 years ago

Sure I'll take a look later today.

On Mon, Jan 2, 2017 at 11:37 AM P. Oscar Boykin notifications@github.com wrote:

Can you add the exclusions to so we can keep the tests green? It prints what you need to add to the build to exclude those methods from erroring.

On Mon, Jan 2, 2017 at 9:09 AM Pankaj Gupta notifications@github.com wrote:

Thanks let me try chained. I agree, constant factor of 4 is insignificant compared to n^2. Tests are failing the Mima check.

On Mon, Jan 2, 2017 at 10:04 AM P. Oscar Boykin < notifications@github.com> wrote:

I have not investigated why the tests are red.

If we can make them green with something faster. I'd be happy to do that.

I don't think using Batched would be too much work. Or we could copy this code or add this dependency:

https://github.com/non/chain/blob/master/src/main/scala/chain/Chain.scala

It is a single file library that is the more general version of Batched (it has empty), it was written by @non https://github.com/non who also wrote Batched.

Do we really need an empty Batched? I would imagine that a Monoid[Option[Batched[T]]] would be almost as fast (still faster than Stream).

It is up to you. I think killing the O(N^2) is most important. Losing a constant factor of 4 is probably not a huge deal if you don't want to work on this other stuff.

Not using a mutable data structure is pretty important to me since this code has been worked on by many people now, and it is much easier to make a mistake with mutable APIs.

— You are receiving this because you authored the thread.

Reply to this email directly, view it on GitHub < https://github.com/twitter/summingbird/pull/703#issuecomment-270003369 , or mute the thread <

https://github.com/notifications/unsubscribe-auth/AAojhv2OALNrtkVtF2XM1awk5KsqCOuAks5rOTwWgaJpZM4LVMH_

.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub <https://github.com/twitter/summingbird/pull/703#issuecomment-270009697 , or mute the thread < https://github.com/notifications/unsubscribe-auth/AAEJdsU6cNEO-Wai0iYGYHNXOgmudGDDks5rOUuEgaJpZM4LVMH_

.

— You are receiving this because you authored the thread.

Reply to this email directly, view it on GitHub https://github.com/twitter/summingbird/pull/703#issuecomment-270012230, or mute the thread https://github.com/notifications/unsubscribe-auth/AAojht74AChMPen2r3kCCNADToPp9s9Sks5rOVHggaJpZM4LVMH_ .

pankajroark commented 7 years ago

Chain seems good: [info] ToBenchmark.batchedConcat thrpt 4 133738359.414 ± 3281958.077 ops/s [info] ToBenchmark.chainConcat thrpt 4 90914620.450 ± 19299667.614 ops/s [info] ToBenchmark.iterConcat thrpt 4 122448980.991 ± 38379171.178 ops/s [info] ToBenchmark.streamConcat thrpt 4 26017386.004 ± 6404231.437 ops/s

I've updated the review with now using chain.

pankajroark commented 7 years ago

Tests pass now, just waiting for shipit to merge.

codecov-io commented 7 years ago

Current coverage is 70.96% (diff: 80.95%)

No coverage report found for develop at 9a80b22.

Powered by Codecov. Last update 9a80b22...a90c9da

johnynek commented 7 years ago

:+1:

ttim commented 7 years ago

What is a reason to expose Chain on a level of OperationContainer ? I'm not strictly against that but Traversable or even TraversableOnce seems more appropriate for me.

To be more precise - AsyncSummer instances should be over Chain to avoid N^2 complexity while everything else should be over Traversable/TraversableOnce. What do you think?

johnynek commented 7 years ago

I think making a follow up PR that minimized the scope of visibility of Chain (maybe even walking back some of the mima exclusions) would be fine. But if we have to copy to do it, I would not.

For instance, you can go: Iterable[T] => Chain[T] but not Iterator[T] => Chain[T] without a copy.

So, maybe we could use Iterable[T] in some cases, and internally use a Chain[T] to do fast concat.

That said, these are fairly "private" classes in that 99% of summingbird users would never use them. In fact, likely only storm/heron platform would use them.

ttim commented 7 years ago

I agree.

Regarding to copying - we don't need Iterator[T] => Chain[T] transformation because OperationContainer#execute accepts single state element. But we need Chain[T] => Iterable[T] transformation which is the same (in terms of copying) as Chain[T] => Iterator[T].

pankajroark commented 7 years ago

I agree exposing chain at OperationContainer is not ideal and also that OperationContainer is used only in storm/heron platform right now, so I feel it's better to keep it simple, as is, for now. Let me create an issue to capture this though.