letsfire / factory

Go语言的协程池 , 节省内存 , 减少GC压力
MIT License
15 stars 0 forks source link

关闭风险 #3

Open buguang01 opened 4 years ago

buguang01 commented 4 years ago
func newWorker() (w *worker) {
    w = &worker{
        params: make(chan interface{}),
    }
    go func(w *worker) {
        for {
            if w.process() {
                break
            }
            atomic.StoreInt32(&w.isBusy, 0)
        }
        // 置为繁忙状态
        atomic.StoreInt32(&w.isBusy, 1)
        // 可能存在任务
        select {
        case params := <-w.params:
            w.action(params)
        default:
        }
        // 关闭任务通道
        close(w.params)
    }(w)
    return
}

在退出的时候,你这边考虑了有可能有某个任务正在写入新的参数。但是在你的这段代码运行到select的时候,也有可能那边还没开始写入参数,导致你这边运行到了default然后就把通道关闭了,导致写入方出现异常。

buguang01 commented 4 years ago

还有,就算你在这里读到了任务参数,开始运行了,但是你在select中做的w.action(params),没有做异常保护。也可能导致出问题。

letsfire commented 4 years ago

第一个问题,我已经先把work标记成busy了,不会再有新写入进入了吧 第二个问题周末修复一下,感谢您的反馈

buguang01 commented 4 years ago

如果在 // 置为繁忙状态 atomic.StoreInt32(&w.isBusy, 1) 之前, if atomic.CompareAndSwapInt32(&w.isBusy, 0, 1) { 这个刚被执行呢?

letsfire commented 4 years ago

我加了一个定时器解决您提的这个问题,至于action出错,我的设想是action自行处理的,line.go里面也有SetPanicHandler,worker.go里的process也只是防止worker死掉,出错了组件这里也是不知道怎么回补的,这些应该action自己处理吧

buguang01 commented 4 years ago
func (w *worker) assign(action func(interface{}), params interface{}) bool {
    if atomic.CompareAndSwapInt32(&w.isBusy, 0, 1) {
        time.Sleep(time.Second)
        w.action = action
        w.params <- params
        return true
    }
    return false
}

如果你在这里加上一个sleep我说的这个BUG就很容易暴露了。 这是一个风险。

buguang01 commented 4 years ago
func TestMaster(t *testing.T) {
    wg := sync.WaitGroup{}
    wg2 := sync.WaitGroup{}
    w := factory.NewMaster(100, 2)
    ctx, cel := context.WithCancel(context.Background())
    li := w.AddLine("testli", func(e interface{}) {
        defer wg.Done()
        time.Sleep(time.Second * 1)
    })
    for i := 0; i < 100; i++ {
        go func() {
            wg2.Add(1)
            defer wg2.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    wg.Add(1)
                    li.Submit(0)
                }
            }
        }()
    }
    time.Sleep(time.Second * 5)
    fmt.Println("adjustsize")
    for i := 0; i < 100; i++ {
        w.AdjustSize(100)
        time.Sleep(time.Second)
        w.AdjustSize(1)
        time.Sleep(time.Second)
    }
    fmt.Println("for end")
    cel()      //关闭发协程
    wg2.Wait() //确认发协程是否能关闭,多半会卡在这里
    fmt.Println("down")
    w.Shutdown()
    fmt.Println("Wait")
    wg.Wait()
    fmt.Println("End")
}

测试

buguang01 commented 4 years ago

你用isBusy的0,1表示是否在被占用,那关闭就是一直被占用,你可以在发关闭信号的时候,把这个改成2,然后写入那个值。

func (w *worker) shutdown() {
        for !atomic.CompareAndSwapInt32(&w.isBusy, 0, 2) {

        }
        w.params <- exitSignal{}
       close(w.params)
}

然后在

for {
    if w.process() {
        break
    }
    atomic.CompareAndSwapInt32(&w.isBusy, 1,0)
}

其他地方你再改改

buguang01 commented 4 years ago

优化的好一点,其实还可以在shutdown方法里强改,或chan的缓存为2等等。