penglongli / blog

18 stars 1 forks source link

golang 的 channel #58

Open penglongli opened 6 years ago

penglongli commented 6 years ago

参考文档:

无缓冲 channel

有一天某个员工离职,需要经过三个步骤:

这样整个步骤是一个串行的,按照写串行程序的思路来说:首先员工签字,领导、HR 等待;然后领导签字,HR 等待;最后 HR 签字。

下边我们用无缓冲 channel 来模拟并发的过程,相互之间不需要一直等待:

package main

import (
    "fmt"
    "time"
    "sync"
)

func Role(role string, c chan int) {
    for {
        fmt.Printf("    %s is working\n", role)

        select {
        case step := <-c:
            fmt.Printf("%s start process\n", role)
            time.Sleep(3 * time.Second)

            newStep := step + 1
            c <- newStep
        default:
        }

        time.Sleep(1500 * time.Millisecond)
    }
}

func scheduler(process chan int, cs... chan int) {
    defer wg.Done()

    step := <- process
    newStep := step
    if step == 1 {
        cs[0] <- step
        newStep = <- cs[0]

        go scheduler(process, cs...)
    }
    if step == 2 {
        cs[1] <- step
        newStep = <- cs[1]

        go scheduler(process, cs...)
    }
    if step == 3 {
        cs[2] <- step
        newStep = <- cs[2]

        go scheduler(process, cs...)
    }

    process <- newStep
}

var (
    wg sync.WaitGroup
)

func main() {
    wg.Add(3)
    c1, c2, c3 := make(chan int), make(chan int), make(chan int)

    go Role("Employee", c1)
    go Role("Leader", c2)
    go Role("Hr", c3)

    process := make(chan int)
    go scheduler(process, c1, c2, c3)

    fmt.Println("离职流程开启")
    process <- 1

    wg.Wait()
    fmt.Println("离职!")
}

输出:

离职流程开启
    Hr is working
    Leader is working
    Employee is working
Employee start process
    Hr is working
    Leader is working
    Leader is working
    Hr is working
    Hr is working
    Leader is working
    Employee is working
Leader start process
    Hr is working
    Employee is working
    Employee is working
    Hr is working
Hr start process
    Leader is working
    Employee is working
    Leader is working
    Employee is working
离职!

有缓冲 channel

有缓冲的 channel 和 Java 中的 BlockingQueue 很相似,其是一个阻塞的缓冲队列。下边的图可以说明:

我们使用“生产者-消费者”的方式来举例子:假设一个生产者生产 15 条消息,有两个消费者来对消息进行消费,其消费速度很慢。代码如下:

package main

import (
    "fmt"
    "time"
)

type Message struct {
    msg string
}

func Producer(queue chan Message) {
    for i := 1; i <= 15; i++ {
        msg := Message{
            msg: fmt.Sprintf("message %d", i),
        }
        queue <- msg
        fmt.Printf("Produced message %d\n", i)
    }
    fmt.Println("Producer completed!")
}

func Consumer(queue chan Message, num int) {
    for {
        select {
        case m := <- queue:
            fmt.Printf("Consume%d message: %s\n", num, m.msg)
            time.Sleep(5 * time.Second)
        }
    }
}

func main() {
    queue := make(chan Message, 5)
    go Producer(queue)
    go Consumer(queue, 1)
    go Consumer(queue, 2)

    time.Sleep(20 * time.Second)
}

输出:

Produced message 1
Produced message 2
Produced message 3
Produced message 4
Produced message 5
Produced message 6
Produced message 7
Consume2 message: message 2
Consume1 message: message 1
Consume1 message: message 3
Consume2 message: message 4
Produced message 8
Produced message 9
Consume2 message: message 5
Produced message 10
Produced message 11
Consume1 message: message 6
Produced message 12
Consume2 message: message 7
Produced message 13
Consume1 message: message 8
Consume2 message: message 9
Produced message 14
Consume1 message: message 10
Produced message 15
Producer completed!
Consume2 message: message 11
Consume1 message: message 12
Consume1 message: message 13
Consume2 message: message 14
Consume1 message: message 15

函数与 channel

channel 可以用来作为函数的参数或者返回值,但是其表现形式有三种:

chan