Closed AmitKumarDas closed 2 years ago
// fan out - fan in - workers
// parallelize CPU & IO
//
func main() {
in := gen(2, 3)
// Distribute the work across two goroutines
// fan out
c1 := sq(in)
c2 := sq(in)
// fan in
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
// non blocking merge [f5]
// input are channels & output is a channel as well
//
func merge(cs ...<-chan int) <-chan int { // return unidirectional chan
var wg sync.WaitGroup
out := make(chan int)
// named func
fill := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go fill(c) // a goroutine per chan
}
go func() { // merge becomes non blocking
wg.Wait()
close(out)
}()
return out // return your managed chan
}
// LEAK
// if consumer fails to consume all the inbound values
// goroutines attempting to send values will block indefinitely
//
main() {
...
// out will have values from multiple channels
out := merge(c1, c2)
// consume only the first value
// i.e. some goroutines with channels are still running
fmt.Println(<-out)
// those running goroutines are hung
// attempting to send values
// due to exit of main post return
return
}
// basics
// what is goroutine leak?
//
// goroutines consume memory and runtime resources
//
// heap references in goroutine stacks
// keep data from being garbage collected
//
// goroutines are not GC-ed
// they must exit on their own
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks til some goroutine does <-c and receives 1
// Receiving from a nil channel blocks forever
//
// Receive operation on a closed channel can always
// proceed immediately
// yielding the element type's zero value
// if buffered then no goroutine
func gen(nums ...int) <-chan int { // unidirectional
out := make(chan int, len(nums)) // bidirectional & buffered
for _, n := range nums { // no goroutine needed
out <- n
}
close(out) // chan can be closed!
return out
}
func genc(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out) // close your own channel [f5]
for n := range in {
select {
case out <- n * n:
case <-done: // other's channel closure
return // fastens closure of out channel
}
}
}()
return out
}
// try receive
// try exit
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
dataCh := make(chan int)
stopCh := make(chan struct{})
toStop := make(chan string, 1) // buffered
var stoppedBy string
// moderate / proxy
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// try to receive
select {
case toStop <- "sender#" + id:
default:
}
return
}
// Try to exit as early as possible
//
// Try-receive and try-send select blocks
// are specially optimized by the standard Go
// compiler, so they are very efficient.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and for ever in theory) if the send
// to dataCh is non-blocking.
//
// If this is unacceptable, then the above
// try-receive operation is essential.
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// Try to exit the receiver goroutine
// as early as possible.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and forever in theory) if the receive
// from dataCh is non-blocking.
//
// If this is not acceptable, then the above
// try-receive operation is essential.
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
// Here, the same trick is
// used to notify the moderator
// to close the additional
// signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}