sinomoe / sino.moe

just a simple issue blog
http://sino.moe
0 stars 1 forks source link

用golang实现Worker并发模式 #9

Open sinomoe opened 6 years ago

sinomoe commented 6 years ago

Preface

golang的并发能力十分强大,在goroutine的加持下,使用golang来开发并发程序也变得十分容易。关于golang的并发模式我也是早就想记录点什么了,本文我将通过golang来实现Worker并发模式,更直观的利用golang的并发特性。

本文最后将:

代码的组织结构如下:

/go_worker_pool
    /work
        work.go
    /pool
        worker.go
        dispatcher.go
    bench_test.go
    main.go

最后的运行结果类似如下:

misakimeinoMacBook-Pro:go_worker_pool sino$ go run main.go 
2018/10/14 20:45:46 worker[0] starting
2018/10/14 20:45:46 worker[1] starting
2018/10/14 20:45:46 worker[2] starting
2018/10/14 20:45:46 worker[3] starting
2018/10/14 20:45:46 Worker[2]: Doing Work[1] hash word["c2WD8F2q"] to ["3727323148"]
2018/10/14 20:45:46 Worker[0]: Doing Work[0] hash word["BpLnfgDs"] to ["1271963291"]
2018/10/14 20:45:46 Worker[1]: Doing Work[2] hash word["NfHK5a84"] to ["3104814218"]
2018/10/14 20:45:46 Worker[3]: Doing Work[3] hash word["jjJkwzDk"] to ["3984948990"]
...

模拟一些工作

首先编写work.go来模拟一些要并发执行的工作,并在其中定义了Workable接口,用来描述工作的行为。代码如下:

package work

import (
    "hash/fnv"
    "log"
    "math/rand"
    "os"
    "time"
)

// Work is a sample work type which implemented Workable interface
type Work struct {
    ID  int
    Job string
}

// a workable work must have a Do method
type Workable interface {
    // Do runs the specified work with argument of worker id
    Do(int)
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")

func randomString(length int) string {
    rs := make([]rune, length)
    for i := range rs {
        rs[i] = letters[rand.Intn(len(letters))]
    }
    return string(rs)
}

// MockSomeWorks mocks some sample works implemented Workable interface
func MockSomeWorks(amount int) []Work {
    works := make([]Work, amount)
    for i := range works {
        works[i] = Work{i, randomString(8)}
    }
    return works
}

// Do implements Workable interface sample
func (w *Work) Do(workerId int) {
    hash := fnv.New32a()
    hash.Write([]byte(w.Job))
    if os.Getenv("DEBUG") == "true" {
        log.Printf("Worker[%d]: Doing Work[%d] hash word[\"%s\"] to [\"%d\"]\n", workerId, w.ID, w.Job, hash.Sum32())
    }
    time.Sleep(time.Second / 2)
}

Workable接口只定义了Do(int)方法,其输入参数为执行该工作的Workerid。

调用MockSomeWorks返回一个Work切片,即所有要被并发处理的任务;切片中每个成员都实现了Workable接口。

Worker实现

Worker好比就是流水线上的工人,只负责做好两件事:做手头的工作、没工作了通知调度者。下面是Worker的定义:

// a worker represents a goroutine
type worker struct {
    // id is a worker's unique attribute
    id            int
    // channel is used to receive new works
    channel chan work.Workable
    // workerChannel holds all available worker's channel
    workerChannel chan chan work.Workable
    // end is used to receive end signal
    end           chan bool
}

一个Worker代表一个goroutine,每个Worker都有自己的id,channel(用来接收work),end(用来接收结束信号);所有的Worker公用一个workerChannel(用于发送/接收空闲Worker的channel)。

定义了Worker的两种行为,start和stop

// start spawn a new goroutine which represents a worker.
// a worker is waiting for 2 channels
// 1. end signal
// 2. works to be done
// a worker sends its channel to workerChannel when having nothing to do
func (w *worker) start() {
    go func() {
        for {
            w.workerChannel <- w.channel
            select {
            case <-w.end:
                return
            case work1 := <-w.channel:
                work1.Do(w.id)
            }
        }
    }()
}

// stop sends end signal to the specified worker
func (w *worker) stop() {
    log.Printf("stopping worker[%d]\n", w.id)
    w.end <- true
}

当worker start以后,会生成一个新的goroutine,在这个goroutine中会不断等待新工作到来或结束信号,当新工作到来即调用该工作的Do方法,当接收到结束信号,该goroutine返回。

调度和管理Worker

前面提到了Worker好比就是流水线上的工人,那么肯定还需要有一个人来调度所有的工人,这就是调度者,他主要负责:接收新工作并分发给空闲的Worker、管理空闲的Worker、给Worker们发送结束信号。由于调度者也是一个独立的goroutine,当外部与调度者之间通信时需要通过Collector进行,Collector接收外部的work和结束信号,并将其传递给调度者,然后由调度者管理并分发(dispatch)给相应的Worker,最后由具体的Worker接收并执行相应的动作。下面是dispatcher.go的代码,其包含了collector和调度者的实现。

// Package pool defines worker's behavior
// and provide a way to manage multi-goroutine
package pool

import (
    "log"
    "sync"

    "github.com/sinomoe/go_worker_pool/work"
)

// Collector is the bridge to communicate with workers
// which have
// 1. a work channel to receive new work
// and then send them to available workers.
// 2. an end channel to receive end signal
// and then send them to all workers.
type Collector struct {
    work chan work.Workable
    end  chan bool
    wg   sync.WaitGroup
}

// Send sends work to the specified worker pool
func (c *Collector) Send(work work.Workable) {
    c.work <- work
}

// End sends end signal to the specified worker pool
// and wait until all workers done
func (c *Collector) End() {
    c.end <- true
    c.wg.Wait()
}

// StartDispatcher starts specified numbered workers
// and starts a goroutine to schedule all workers
// returns a Collector pointer, through which we can send new
// work to workers or stop all.
func StartDispatcher(workerAmount int) *Collector {
    workerChannel := make(chan chan work.Workable, workerAmount)
    workers := make([]worker, workerAmount)

    input := make(chan work.Workable)
    end := make(chan bool)
    collector := Collector{
        work: input,
        end:  end,
    }
    collector.wg.Add(workerAmount)

    // init all workers
    // every worker has a unique channel, end
    // all workers share one workerChannel
    for i := range workers {
        workers[i] = worker{i, make(chan work.Workable), workerChannel, make(chan bool)}
        log.Printf("worker[%d] starting\n", i)
        workers[i].start()
    }

    // schedule Workers
    go func() {
        for {
            select {
            case <-end:
                for i := range workers {
                    workers[i].stop()
                    collector.wg.Done()
                }
                return
            case work1 := <-input:
                worker := <-workerChannel
                worker <- work1
            }
        }
    }()

    return &collector
}