kevinyan815 / gocookbook

go cook book
MIT License
788 stars 167 forks source link

预防并发搞垮友军的几个方法 #63

Open kevinyan815 opened 3 years ago

kevinyan815 commented 3 years ago

巧用WaitGroup

因为go对并发的原生支持使得并发编程难度大大降低,刚学会Go语言的人特别喜欢在开发的时候尝试并发,其实并发并不是解决所有问题的银弹,反而是一味想尝试并发造成了不少线上BUG/事故。比如说,有的人会误以为起几十个线程休眠一下,再接着起线程就能控制并发数了,其实不是,比如像下面这么写

func badConcurrency() {
    batchSize := 50
    for {
        data, _ := queryDataWithSizeN(batchSize)
        if len(data) == 0 {
            break
        }

        for _, item := range data {
            go func(i int) {
                doSomething(i)
            }(item)
        }

        time.Sleep(time.Second * 1)
    }
}

对于调用者来说,看起来确实是控制了一秒中只发出去了50个请求,我们不能只从调用者的角度考虑事情,如果恰巧赶上对方正忙在,你程序休眠的时候下游服务并没有处理完你发过去的这批请求,这时你再发一批过去,累计下来无疑是对对方的服务器雪上加霜。最好的是等到上一批并发都返回了再去开启下一批,这个可以通过WaitGroup实现。

func useWaitGroup() {

    batchSize := 50
    for {
        data, _ := queryDataWithSizeN(batchSize)
        if len(data) == 0 {
            fmt.Println("End of all data")
            break
        }
        var wg sync.WaitGroup
        for _, item := range data {
            wg.Add(1)
            go func(i int) {
                doSomething(i)
                wg.Done()
            }(item)
        }
        wg.Wait()

        fmt.Println("Next bunch of data")
    }
}

巧用Semaphore

上面是一批处理完再开启下一批并发,也可以一个处理完后下一个补上,但同时发起的请求书最多不超过N个的限制,这个可以通过信号量实现。

func useSemaphore() {
    var concurrentNum int64 = 10
    var weight int64 = 1
    var batchSize int = 50
    s := semaphore.NewWeighted(concurrentNum)
    for {
        data, _ := queryDataWithSizeN(batchSize)
        if len(data) == 0 {
            fmt.Println("End of all data")
            break
        }

        for _, item := range data {
                        s.Acquire(context.Background(), weight)
            go func(i int) {
                doSomething(i)
                s.Release(weight)
            }(item)
        }

    }
}

使用限速器

再有就是使用限速器了,这个不像上面两个可以等到请求返回再开启下一个/一批,而是实实在在的限流,可以通过官方库提供的 time/rate 限流器实现。

func useRateLimit() {
    limiter := rate.NewLimiter(rate.Every(1*time.Second), 50)
    batchSize := 50
    for {
        data, _ :=queryDataWithSizeN(batchSize)
        if len(data) == 0 {
            fmt.Println("End of all data")
            break
        }

        for _, item := range data {
            // blocking until the bucket have sufficient token
            err := limiter.Wait(context.Background())
            if err != nil {
                fmt.Println("Error: ", err)
                return
            }
            go func(i int) {
                doSomething(i)
            }(item)
        }
    }
}

使用生产者消费者模式

利用channel实现一个生产者消费者队列模式,生产者从库里捞数据发送到通道,消费者通过通道接收数据做处理。

func useChannel() {
    batchSize := 50
    dataChan := make(chan int)
    var wg sync.WaitGroup
    wg.Add(batchSize + 1)
    // 生产者
    go func() {
        for {
            data, _ := queryDataWithSizeN(batchSize)
            if len(data) == 0 {
                break
            }
            for _, item := range data {
                dataChan <- item
            }
        }
        close(dataChan)
        wg.Done()
    }()
    // 消费者
    go func() {
        for i := 0; i < 50; i++ {
            go func() {
                for {
                    select {
                    case v, ok := <- dataChan:
                        if !ok {
                            wg.Done()
                            return
                        }
                        doSomething(v)
                    }
                }
            }()
        }
    }()

    wg.Wait()
}

完整代码参考:https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go