kevinyan815 / gocookbook

go cook book
MIT License
789 stars 167 forks source link

并发编程之ErrorGroup--子goroutine的错误传播和取消执行 #35

Open kevinyan815 opened 3 years ago

kevinyan815 commented 3 years ago

在并发编程里,sync.WaitGroup并发原语的使用频率非常高,它经常用于协同等待的场景:goroutine A 在检查点(checkpoint)等待一组执行任务的 worker goroutine 全部完成,如果在执行任务的这些 goroutine 还没全部完成, goroutine A 就会阻塞在检查点,直到所有woker goroutine 都完成后才能继续执行。

如果在woker goroutine的执行过程中遇到错误并想要处理该怎么办?WaitGroup并没有提供传播错误的功能,遇到这种场景我们改怎么办?Go语言在扩展库的提供了ErrorGroup并发原语正好适合在这种场景下使用,它在WaitGroup的基础上还提供了,错误传播以及上下文取消的功能

Go扩展库通过errorgroup.Group提供ErrorGroup原语的功能,它有三个方法可调用:

func WithContext(ctx context.Context) (*Group, context.Context)
func (g *Group) Go(f func() error)
func (g *Group) Wait() error

接下来我们让主goroutine使用ErrorGroup代替WaitGroup等待所以子任务的完成,ErrorGroup有一个特点是会返回所以执行任务的goroutine遇到的第一个错误。我们试着执行一下下面的程序,观察一下程序的输出。

package main

import (
    "fmt"
    "log"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    var eg errgroup.Group
    for i := 0; i < 100; i++ {
        i := i
        eg.Go(func() error {
            time.Sleep(2 * time.Second)
            if i > 90 {
                fmt.Println("Error:", i)
                return fmt.Errorf("Error occurred: %d", i)
            }
            fmt.Println("End:", i)
            return nil
        })
    }
    if err := eg.Wait(); err != nil {
        log.Fatal(err)
    }
}

上面程序,遇到i大于90的都会产生错误结束执行,但是只有第一个执行时产生的错误被ErrorGroup返回,程序的输出大概如下:

......
End: 49
End: 26
Error: 98
End: 63
End: 39
End: 50
End: 38
Error: 95
End: 67
End: 65
End: 57
End: 64
2020/12/17 18:11:40 Error occurred: 98

最早执行遇到错误的goroutine输出了Error: 98但是所有未执行完的其他任务并没有停止执行,那么想让程序遇到错误就终止其他子任务该怎么办呢?我们可以用errgroup.Group提供的WithContext方法创建一个带可取消上下文功能的ErrorGroup

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    eg, ctx := errgroup.WithContext(context.Background())

    for i := 0; i < 100; i++ {
        i := i
        eg.Go(func() error {
            time.Sleep(2 * time.Second) 

            select {
            case <-ctx.Done():
                fmt.Println("Canceled:", i)
                return nil
            default:
                if i > 90 {
                    fmt.Println("Error:", i)
                    return fmt.Errorf("Error: %d", i)
                }
                fmt.Println("End:", i)
                return nil
            }
        })
    }
    if err := eg.Wait(); err != nil {
        log.Fatal(err)
    }
}

Go方法单独开启的gouroutine在执行参数传递进来的函数时,如果函数返回了错误,会对ErrorGroup持有的err字段进行赋值并及时调用cancel函数,通过上下文通知其他子任务取消执行任务。所以上面更新后的程序会员如下类似的输出。

......
Error: 99
Canceled: 68
Canceled: 85
End: 57
End: 51
Canceled: 66
Canceled: 93
Canceled: 72
Canceled: 78
End: 55
Canceled: 74
2020/12/17 18:23:12 Error: 99

使用errorgroup.Group时注意它的两个特点