Lehirt / no_idea

0 stars 0 forks source link

go channel #5

Open Lehirt opened 2 years ago

Lehirt commented 2 years ago

在Go语言中,每一个并发的执行单元叫作一个goroutine。

网络编程是并发大显身手的一个领域,服务器是最典型的需要同时处理很多连接的程序,这些连接一般来自于彼此独立的客户端。使用goroutine不仅可以并发处理多个连接,也可以在单个连接中使用并发。需要慎重考虑单个连接中的并发安全问题。

除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个goroutine来打断另一个的执行,但是可以通**过goroutine之间的通信来让一个goroutine请求其它的goroutine,并让被请求的goroutine自行结束执行。

如果说goroutine是Go语言程序的并发体的话,那么channels则是它们之间的通信机制。一个channel是一个通信机制,它可以让一个goroutine通过它给另一个goroutine发送值信息。每个channel都有一个特殊的类型,也就是channels可发送数据的类型。一个可以发送int类型数据的channel一般写为chan int。

一个channel有发送和接受两个主要操作,都是通信行为。一个发送语句将一个值从一个goroutine通过channel发送到另一个执行接收操作的goroutine。发送和接收两个操作都使用<-运算符。在发送语句中,<-运算符分割channel和要发送的值。在接收语句中,<-运算符写在channel对象之前。一个不使用接收结果的接收操作也是合法的。

ch <- x  // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch     // a receive statement; result is discarded

Channel支持close操作,用于关闭channel,随后对基于该channel的任何发送操作都将导致panic异常。对一个已经被close过的channel进行接收操作依然可以接受到之前已经成功发送的数据;如果channel中已经没有数据的话将产生一个零值的数据。

使用内置的close函数就可以关闭一个channel:

close(ch)

以最简单方式调用make函数创建的是一个无缓存的channel,但是我们也可以指定第二个整型参数,对应channel的容量。如果channel的容量大于零,那么该channel就是带缓存的channel。

ch = make(chan int)    // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
Lehirt commented 2 years ago

一个基于无缓存channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的Channels上执行接收操作,当发送的值通过Channels成功传输之后,两个goroutine可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直到有另一个goroutine在相同的Channels上执行发送操作。

基于无缓存Channels的发送和接收操作将导致两个goroutine做一次同步操作。因为这个原因,无缓存Channels有时候也被称为同步Channels。存在以下happens before关系:当通过一个无缓存Channels发送数据时,接收者收到数据发生在再次唤醒发送者goroutine之前。

在讨论并发编程时,当我们说x事件在y事件之前发生(happens before),我们并不是说x事件在时间上比y时间更早;我们要表达的意思是要保证在此之前的事件都已经完成了,例如在此之前的更新某些变量的操作已经完成,你可以放心依赖这些已完成的事件了。当我们说x事件既不是在y事件之前发生也不是在y事件之后发生,我们就说x事件和y事件是并发的。这并不是意味着x事件和y事件就一定是同时发生的,我们只是不能确定这两个事件发生的先后顺序。

//gopl.io/ch8/netcat3
func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}

基于channels发送消息有两个重要方面。首先每个消息都有一个值,但是有时候通讯的事实和发生的时刻也同样重要。当我们更希望强调通讯发生的时刻时,我们将它称为消息事件。有些消息事件并不携带额外的信息,它仅仅是用作两个goroutine之间的同步,这时候我们可以用struct{}空结构体作为channels元素的类型,虽然也可以使用bool或int类型实现同样的功能,done <- 1语句也比done <- struct{}{}更短。

Lehirt commented 2 years ago

易并行问题:程序正确性不依赖处理结果的顺序。这种程序最容易改成并行的,但是要注意 goroutine 的泄露 ( 指一个 goroutine 因为死锁 / channel 阻塞导致永远不会退出 )。

//创建已知大小的缓存channel
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
    type item struct {
        thumbfile string
        err       error
    }

    ch := make(chan item, len(filenames))
    for _, f := range filenames {
        go func(f string) {
            var it item
            it.thumbfile, it.err = thumbnail.ImageFile(f)
            ch <- it
        }(f)
    }

    for range filenames {
        it := <-ch
        if it.err != nil {
            return nil, it.err
        }
        thumbfiles = append(thumbfiles, it.thumbfile)
    }

    return thumbfiles, nil
}
//直接读取阻塞channel
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}
//死锁场景
//fatal error: all goroutines are asleep - deadlock!
//   goroutine 1 [chan receive]:
//   main.main()
//   /go-demo/main.go:** +0x145

func main() {
    ch := make(chan int)
    for i := 0; i < 10; i++ {
        go func(i int) {
            fmt.Printf("worker goroutine ===== %d \n", i)
            ch <- i
        }(i)
    }

    for str := range ch {
        fmt.Println(str)
    }
}
Lehirt commented 2 years ago

对资源的限制访问。

有缓存的 channel 可以充当信号量的概念,即使有很多goroutine,也能有目的的放行对资源的访问。

// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)

func gofunc ()  {
    tokens <- struct{}{} // acquire a token
    do something 
    <-tokens // release the token
}

也可以启动固定数量的 goroutine ,使用无阻塞的 channel 在 main goroutine 中唤醒陷入阻塞的 worker goroutine

func main() {
    upupup := make(chan string) // de-duplicated URLs

    // Create 20 crawler goroutines to fetch each unseen link.
    for i := 0; i < 20; i++ {
        go func() {
            for up := range upupup {
                do 
            }
        }()
    }

    for  re := range resource {
          upupup <- "up"
    }
}
Lehirt commented 2 years ago

当 goroutine 在某个节点需要从不同的 channel 判断走向时,使用 select 实现多路复用。比如监听 goroutine 过期事件。

func main() {
    // ...create abort channel...

    fmt.Println("Commencing countdown.  Press return to abort.")
    select {
    case <-time.After(10 * time.Second):
        // Do nothing.
    case <-abort:
        fmt.Println("Do aborted!")
        return
    }
    do() // after 10 second complete something
}

select 通常与 for 循环一起使用

func main() {
     done := make(chan struct{})

     go func() {
          time.Sleep(5 * time.Second)
          close(done)
     }()

loop:
    for {
          select {
          case <-done:
                fmt.Println("done")
                break loop    //退出loop循环,单纯的break无法退出for循环
          default:    //轮询channel
          }
    }
}

Go语言并没有提供在一个 goroutine 中终止另一个 goroutine 的方法,由于这样会导致 goroutine 之间的共享变量落在未定义的状态上。可以让 goroutine 监听一个阻塞的 cancel chan ,如果可以读取 cancel chan便让 goroutine 退出。针对发出 cancel chan 信号的 goroutine 来说,确认某时刻有多少存在的 goroutine 并发送准确数量的 cancel 信号是非常困难的。为了能够实现退出所有需要关闭的 goroutine ,我们需要更可靠的策略,通过一个 channel 将消息广播出去,这样所有的 goroutine 都能够看到这条事件消息,并且在看到这条事件消息时,通过 happen-before 规则可以知道退出信号已经产生了。

当一个 channel 被关闭以后,所有写 channel 操作都将 panic ,所有读 channel 操作都会消耗 channel 中的值,当值被消耗完时,读 channel 不会阻塞,而是读到一个零值并继续运行 goroutine 。我们的广播机制即基于此:定义一个 cancel chan ,这个chan 仅用于广播关闭信号,当某个 goutine 关闭了这个 chan 时,所有读取这个 chan 的 goutine 便知道关闭信号已经到来,可以退出 goutine 了。即不要向 channel 发送值,而是用关闭一个 channel 来进行广播