letsfire / factory

Go语言的协程池 , 节省内存 , 减少GC压力
MIT License
15 stars 0 forks source link

master.Shutdown()结束后,协程池中的协程并未全部退出 #1

Closed zhaomin1993 closed 4 years ago

zhaomin1993 commented 5 years ago
package factory

import (
    "fmt"
    "runtime"
    "sync"
    "testing"
    "time"
)

type hh struct {
    mux *sync.Mutex
    a int
}

//go test -v -test.run TestNewMaster
func TestNewMaster(t *testing.T) {
    master := NewMaster(1000, 1000)
    // 新建第一条工作流水线
    var line1 = master.AddLine("demo.line.1", func(args interface{}) {
        h := args.(hh)
        h.mux.Lock()
        h.a++
        if h.a%10000 == 0 {
            fmt.Println(h.a)
        }
        h.mux.Unlock()
        time.Sleep(time.Millisecond*10)
    })

    // 根据业务场景将参数提交
    mux := &sync.Mutex{}
    h := hh{mux:mux}
    for i := 0; i < 1000000; i++ {
        h.a = i
        line1.Submit(h)
    }

    // 协程池数量可动态调整
    master.Running()            // 正在运行的协程工人数量
    //master.AdjustSize(100)      // 指定数量进行扩容或缩容
    master.Shutdown()           // 等于 master.AdjustSize(0)
    fmt.Println(runtime.NumGoroutine())
    time.Sleep(time.Second*5)
    fmt.Println(runtime.NumGoroutine())
}
/*
从上面代码执行结束后可以看到还有很多协程未退出,也就是说还有很多任务没有执行完,  
也就无法判断什么时候所有任务都执行完了,如果我要拿到所有执行结果,你的协程池就没法用
*/
letsfire commented 5 years ago

你要拿到执行结果是什么意思,AddLine添加的方法本身就没有返回值的,也就是说适合执行任务型的工作,按你的代码执行 runtime.NumGoroutine() [头一个1002,后一个是2,是没问题的。Shutdown并不是立即关闭协程的,而是发出一个关闭信号,等手头任务都执行完毕了才释放的,所以你休眠5秒后是已经都释放了的。代码我放到了 ./issue/0001_test.go

letsfire commented 5 years ago

可能需要阻塞Shutdown,等释放完毕了在返回?Shutdown和真正协程销毁之间有微小的时间差,比你AddLine的方法执行时间微长一点点点

zhaomin1993 commented 5 years ago

建议master.Shutdown()加上sync.WaitGroup的功能,不然可能main退出了,任务都还没跑完,也不用去估算一个时间

letsfire commented 5 years ago

抱歉,刚才 比你AddLine的方法执行时间微长一点点点 这个结论是错误的。 这个问题其实不用纠结,调用Shutdown()之后协程虽然没有销毁,但是任务已经确保执行完毕了的,只是还没来得及销毁,所以 runtime.NumGoroutine() 需要等待一会儿,GO内部处理这个事情。具体可以看一下 ./worker.go

zhaomin1993 commented 5 years ago
func (w *worker) shutdown() {
    // 置为繁忙状态
    atomic.StoreInt32(&w.isBusy, 1)
    // 可能存在任务
    select {
    case params := <-w.params: //如果该worker没有排队的任务但正在执行任务,比如这个任务还需要花费5s才能结束,这个case是感知不到的,会直接执行default,然后close(),虽然close()了但worker.process()并不会立即返回false,因为任务还需要5s才结束,5s后结束了在下一个循环时才会退出协程,这才是真相
        w.action(params)
    default:
    }
    // 关闭任务通道
    close(w.params)
}
/*
你可以试试下面这段测试代码,可以证明我的观点
*/
package factory

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

type hh struct {
    a int
}

//go test -v -test.run TestNewMaster
func TestNewMaster(t *testing.T) {
    master := NewMaster(1000, 1000)
    // 新建第一条工作流水线
    var line1 = master.AddLine("demo.line.1", func(args interface{}) {
        time.Sleep(time.Second*5)
        h := args.(hh)
        if h.a%1000 == 0 {
            fmt.Println("work out:  ",h.a)
        }
    })

    // 根据业务场景将参数提交
    for i := 0; i <= 3000; i++ {
        line1.Submit(hh{a:i})
    }

    // 协程池数量可动态调整
    //master.Running()            // 正在运行的协程工人数量
    //master.AdjustSize(100)      // 指定数量进行扩容或缩容
    master.Shutdown()           // 等于 master.AdjustSize(0)
    fmt.Println("showdown over, all work finished ?????????")
    fmt.Println(runtime.NumGoroutine())
    time.Sleep(time.Second*5)
    fmt.Println(runtime.NumGoroutine())
}
letsfire commented 5 years ago

你说的是对的,我提交了一个方案在 develop,帮忙Review 是否有问题,或者更好的方案。感谢