letsfire / factory

Go语言的协程池 , 节省内存 , 减少GC压力
MIT License
15 stars 0 forks source link

数组越界BUG #4

Open buguang01 opened 4 years ago

buguang01 commented 4 years ago
func (m *Master) AdjustSize(newSize int) {
    if int64(newSize) > m.maxNum {
        newSize = int(m.maxNum)
    }

    m.Lock()
    defer m.Unlock()

    if diff := newSize - int(m.ingNum); diff > 0 {
        for i := 0; i < diff; i++ {
            m.workers[m.ingNum] = newWorker()
            atomic.AddInt64(&m.ingNum, 1)
        }
    } else if diff < 0 {
        atomic.StoreInt64(&m.ingNum, int64(newSize))
        if cursor := atomic.LoadInt64(&m.cursor); cursor > int64(newSize) {
            atomic.StoreInt64(&m.cursor, int64(newSize))
        }
        for _, w := range m.workers[newSize:] {
            if w == nil {
                break
            }
            w.shutdown()
        }
        m.workers = m.workers[0:newSize]
    }
}

func (m *Master) Running() int64 {
    return atomic.LoadInt64(&m.ingNum)
}

func (m *Master) Shutdown() {
    m.AdjustSize(0) // 关闭所有worker
}

func (m *Master) getWorker() *worker {
    atomic.CompareAndSwapInt64(&m.cursor, m.ingNum, 0)
    idx := atomic.AddInt64(&m.cursor, 1)
    w := m.workers[idx-1]
    return w
}

当cursor与原ingNum相等时,你要进行缩容 这个时候,如果是先把ingNum改成了新的值,再 atomic.CompareAndSwapInt64(&m.cursor, m.ingNum, 0) 运行这行时,就不会修改成功,然后你对cursor,进行了累加,导致越界,或有可能拿到一个nil对象。 还有就是你的代码里,对workers初始化的时候,用的是maxNum 但后面修改的时候,又用了ingNum做为他的长度。 我原以为你是要一开始声明一个确定长度的数组。但后面又换了,如果按你后面的逻辑 你一开始应该是 workers: make([]*worker, initNum,maxNum) 才对。

letsfire commented 4 years ago

大佬,帮忙Review

buguang01 commented 4 years ago

你写了一个比之前复杂的保证安全的逻辑,但仔细看,还是会有问题

if atomic.CompareAndSwapInt32(&m.tryFlag, 0, 1) {
        atomic.StoreInt64(&m.cursor, 0)
        atomic.StoreInt32(&m.tryFlag, 0)
        m.synCond.Broadcast()
    } else {
        m.synCond.L.Lock()
        m.synCond.Wait()
        m.synCond.L.Unlock()
    }

你不能保证你在m.synCond.Broadcast()时,所有其他协程都在m.synCond.Wait() 虽然你的逻辑也不能保证m.synCond.Broadcast()之前,只有一个协程进入if的then里,但如果只有一个进入了then而有2个进入了else其中一个到了Wait,还有一个在Lock()等待进入中 这个时候你的m.synCond.Broadcast()只能让之前在wait的出来,然后Lock()就进入了wait。 这个时候,如果没有新的协程去m.synCond.Broadcast(),那这个Wait就会一直wiat下去。

虽然Wait方法里面会在添加监听后,释放锁,让别的协程可以进来,但是依然存在着我上面说的那种情况发生的概率;

buguang01 commented 4 years ago

你把workers从数组改成了双map进行切换,但你写的并没有达到目标,只是map在获取的时候,可以检查而已,并不需要双map 如果使用双map那每次就应该只更新其中一个map,但那样的话就会导致资源的不释放问题,和你写这个东西的初衷就违背了。

buguang01 commented 4 years ago

还有一个问题,那就是你的双map是同一个map,导致一修改数量,就会出现 fatal error: concurrent map read and map write

buguang01 commented 4 years ago
func TestMaster(t *testing.T) {
    wg := sync.WaitGroup{}
    wg2 := sync.WaitGroup{}
    w := NewMaster(10, 2)
    ctx, cel := context.WithCancel(context.Background())
    li := w.AddLine("testli", func(e interface{}) {
        defer wg.Done()
    })
    for i := 0; i < 100; i++ {
        go func() {
            wg2.Add(1)
            defer wg2.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    wg.Add(1)
                    li.Submit(0)
                }
            }
        }()
    }
    time.Sleep(time.Second * 5)
    fmt.Println("adjustsize")
    w.AdjustSize(6)
    time.Sleep(time.Second * 2)
    w.AdjustSize(1)
    time.Sleep(time.Second * 2)
    cel() //关闭发协程
    w.AdjustSize(2)
    time.Sleep(time.Second * 2)
    wg2.Wait() //确认发协程是否能关闭,多半会卡在这里
    fmt.Println("down")
    w.Shutdown()
    fmt.Println("Wait")
    wg.Wait()
    fmt.Println("End")
}

给你一段测试代码。 改到你能看到输出end就行

buguang01 commented 4 years ago

记的多跑几次,几个问题都会暴露出来的。

letsfire commented 4 years ago

感谢,这段工作较忙,问题已经修复,使用了sync.Map,您的测试用例已通过。再次感谢

buguang01 commented 4 years ago

你的代码需要优化。用了好多互斥锁。单线程,原子锁。

letsfire commented 4 years ago

还请指点,resetGuard里面是多,但是这段代码执行少,复杂度可以忽略吧

buguang01 commented 4 years ago

Master.resetGuard 这个是用来干什么用的?

letsfire commented 4 years ago

确保只有一个协程能重置m.cursor

buguang01 commented 4 years ago

确保只有一个协程能重置m.cursor

原因呢? 你里面本来就是一个原子操作了

buguang01 commented 4 years ago

还有就是getWorker为什么是递归?不应该是循环吗?

letsfire commented 4 years ago

m.workers.Load(idx) 同一时间会有多个协程拿不到worker进而去下面重置cursor,这时候下面的guard就是仅让其中一个协程进行重置,其他等待重置完毕继续获取从头获取