Closed taiyang-li closed 2 years ago
@taiyang-li
Actually, the configuration method Stream.WithConcurrency
simply updates and store the concurrency value (default = 1) to be used by the next downstream processor operator.
For instance, in the following example:
strm.WithConcurrency(4).Filter
sets the concurrency for Filter
to 4
strm.WithConcurrency(2).Map
sets concurrency of 2
for Map
func main() {
strm := stream.New([]rune("B世!ぽ@opq...XѬYZbcef7ghijCklrAstvw"))
strm.WithConcurrency(4).Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}).WithConcurrency(2).Map(func(item rune) string {
return string(item)
}).Batch().Sort()
strm.Into(collectors.Writer(os.Stdout))
if err := <-strm.Open(); err != nil {
fmt.Println(err)
return
}
}
Method WithConcurrency
can be defined here - https://github.com/vladimirvivien/automi/blob/464abb800463bcf784287f182f929f3a2fb8244e/stream/stream.go#L42
And when here in the following location https://github.com/vladimirvivien/automi/blob/464abb800463bcf784287f182f929f3a2fb8244e/stream/stream_unaryop.go#L11
Add logic to set the concurrency on the operator (inside the Transform method)
func (s *Stream) Transform(op api.UnOperation) *Stream {
operator := unary.New()
operator.SetOperation(op)
operator.SetConcurrency(s.concurr)
s.concurr = 1 // reset to default
s.ops = append(s.ops, operator)
return s
}
It looks like I started work on that a while back but removed it: https://github.com/vladimirvivien/automi/blob/c9e7f649bd79026a094a5666e309823b543a5d6b/operators/unary/unaryop.go
Let me know if that makes sense.
Thanks for you detailed explanation. I think I totally understand what you mean and that is a very good design. I will improve my PR in recently 1-2 weeks.
Does
Stream.WithConcurrency
mean that all the operators in the stream have the same concurrency. Maybe it is inconvenient to set concurrency for a single operator?