499689317 / notes

note
2 stars 0 forks source link

Golang并发场景与超时控制 #36

Open 499689317 opened 2 years ago

499689317 commented 2 years ago
func InitDo() {
    ch := Do()
    // 使用for range形式读取chan,操作完成后需要close掉才能执行后续代码,否则死锁
    for v := range ch {
        fmt.Println("output r", v)
    }

    fmt.Println("main process go on....")
}

// 常见的waitgroup并发控制,协程的退出取决于写入ch的值被读取,未被读取前协程处于阻塞状态造成协程泄漏
// waitgroup只是为了控制close chan的时机,只有waitgroup才能知道业务什么时候结束了
func Do() <-chan int {
    ch := make(chan int, 100)
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go func(ii int) {
            defer func() {
                wg.Done()
            }()
            fmt.Println("begin", ii)
            time.Sleep(time.Second * 5)
            r := ii * ii
            ch <- r
            // fmt.Println("after", r)
        }(i)
    }
    go func() {
        wg.Wait()
        close(ch)
        fmt.Println("monitor")
    }()
    return ch
}
499689317 commented 2 years ago
// 控制任务只执行2s,超过时间退出
func InitCxtDo() {
    cxt, cancel := context.WithTimeout(context.TODO(), time.Second*2)
    defer cancel()

    ch := cxtDo(cxt)
    isTimeout := false
    isExit := false
    // for select读取chan,似乎并不能准确判断所有协程是否都已经执行完毕,只能一直处于监听chan的状态直到超时
    // 因为没有使用waitgroup,所以并不知道在什么时候将chan close掉
    for {
        select {
        case <-cxt.Done():
            isTimeout = true
        case v, ok := <-ch:
            fmt.Println("output <-ch", v, ok)
            if !ok {
                isExit = true
            }
        }
        if isTimeout {
            fmt.Println("cxt timeout exit")
            break
        }
        if isExit {
            fmt.Println("go done exit")
            break
        }
    }

    time.Sleep(time.Second * 10)
    fmt.Println("main process go on.....")
}

func cxtDo(cxt context.Context) <-chan int {
    ch := make(chan int, 10)
    for i := 0; i < 10; i++ {
        // 实际场景中,交给协程一个任务,任务完成即可退出,并不需要持续执行任务
        go func(ii int) {
            fmt.Println("begin", ii)
            // TODO 当协程内任务为阻塞操作时,这种写法无法控制最后一个协程的生命周期,因为阻塞操作总是阻止后续代码的执行
            // 似乎并不严谨???
            c := make(chan int)
            go func() {
                // 还是没有办法控制这个协程的退出???我们在使用协程处理耗时操作时,必需要预估当前耗时操作的最坏结果,否则就可能会出现内存泄漏
                time.Sleep(time.Second * 5)
                r := ii + 10
                c <- r
            }()
            // 让协程处理完本次任务就退出,不在处理后续任务
            select {
            case <-cxt.Done():
                fmt.Println("cxt done")
            case v := <-c:
                ch <- v
                // fmt.Println("after", v)
            }
        }(i)
    }
    return ch
}
499689317 commented 2 years ago
func InitUpCxtDo() {
    cxt, cancel := context.WithTimeout(context.TODO(), time.Second*2)
    defer cancel()

    pch := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            pch <- i
        }
    }()

    ch := upCxtDo(cxt, pch)
    isExit := false
    for {
        select {
        case <-cxt.Done():
            fmt.Println("cxt timeout done")
            isExit = true
        case v := <-ch:
            fmt.Println("output <-ch", v)
        }
        if isExit {
            break
        }
    }
    fmt.Println("此时虽然超时时间到了,但是upCxtDo函数开启的子协程却并未退出,继续在后台运行")
    time.Sleep(time.Second * 10)
    fmt.Println("main process go on.....")
}

func upCxtDo(cxt context.Context, pch <-chan int) <-chan int {
    ch := make(chan int, 10)

    // var wg sync.WaitGroup
    // wg.Add(10)

    for i := 0; i < 10; i++ {
        // 实际场景中,开启有限个协程监听任务不断过来,并在某些情况下能退出协程防止泄漏
        go func() {

            // defer func() {
            //  wg.Done()
            // }()

            isTimeout := false
            // 这个for select负责监听要处理的任务,这种写法的好处是整个for select覆盖了业务层的处理逻辑,如果协程收到退出信号则整个协程都可以主动退出
            // 但是阻塞操作会卡住当前执行的for循环,此时即使cxt<-Done()超时了,也需要在下一次for循环时才能读到信号
            for {
                select {
                case <-cxt.Done():
                    // 但实际情况却有些不一样,超时信号需要等到for循环的下一帧才执行,因为上一帧被其它业务阻塞了
                    // 还是得根据实际情况来操作阻塞代码的超时,选用有超时机制的第三方库是比较好的选择
                    isTimeout = true
                case v := <-pch:
                    fmt.Println("begin", v)
                    // 拿到任务并处理,再通过chan传到外层,注意:这里如果被阻塞了,那么cxt.Done()是不能及时得到处理的,需要等到下一次循环执行
                    time.Sleep(time.Second * 5)
                    r := v * v
                    ch <- r
                    // fmt.Println("after", r)
                }
                if isTimeout {
                    fmt.Println("cxt done")
                    break
                }
            }
        }()
    }

    // go func() {
    //  wg.Wait()
    //  close(ch)
    // }()

    return ch
}
499689317 commented 2 years ago

在实际使用协程中,要对阻塞操作进行控制,我们没办法去干预一个已经被阻塞的协程并让其退出

func InitUpgradeCxt() {
    cxt, cancel := context.WithTimeout(context.TODO(), time.Second*2)
    defer cancel()

    pch := make(chan int, 10)
    for i := 0; i < 10; i++ {
        pch <- i
    }

    ch := upgradeCxtDo(pch)
    isTimeout := false
    for {
        select {
        case <-cxt.Done():
            isTimeout = true
            // TODO...
            close(pch)
            fmt.Println("cxt timeout done")
        case v := <-ch:
            fmt.Println("output <-ch", v)
        }
        if isTimeout {
            break
        }
    }
    fmt.Println("超时后子协程似乎没有退出????")
    time.Sleep(time.Second * 10)
    fmt.Println("main process go on.....")
}

func upgradeCxtDo(pch <-chan int) <-chan int {
    ch := make(chan int)
    for i := 0; i < 10; i++ {
        go func() {
            // 只要协程内的业务被阻塞住,不管怎么去监听都似乎不在起作用了????
            // 不管是for select或者for range都不能监听到退出信号
            for v := range pch {
                fmt.Println("exec v", v)
                time.Sleep(time.Second * 5)
                r := v * v
                ch <- r
            }
            // 即使调用了close pch操作,但只要是for循环内部被阻塞了,都无法监听到退出信号,只能干等被阻塞操作
            // 正常情况下close pch后,for range是会正常退出的
            fmt.Println("child exit")
        }()
    }
    return ch
}