alitto / pond

🔘 Minimalistic and High-performance goroutine worker pool written in Go
MIT License
1.43k stars 60 forks source link

Exceptions may occur when closing the pool #20

Closed zhu121 closed 2 years ago

zhu121 commented 2 years ago

Assuming that the pool is limited in size and is busy, an exception will occur when the pool is closed and the external task is still sending to the pool.

Test code:

func TestStop(t *testing.T) {
    pool := New(1, 4, MinWorkers(1), IdleTimeout(1*time.Second))
    // Simulate some users sending tasks to the pool
    go func() {
        for i := 0; i < 30; i++ {
            pool.Submit(func() {
                fmt.Println("do task")
                time.Sleep(3 * time.Second)
            })
        }
    }()
    // Suppose the server is shut down after a period of time
    time.Sleep(5 * time.Second)
    pool.StopAndWait()
}

panic: send on closed channel

alitto commented 2 years ago

Hey @zhu121,

This is the expected behaviour in that scenario. When the process is shutting down, it's the application's reponsibility to make sure no more tasks are submitted to the pool after it was stopped (by calling StopAndWait()).

A common pattern to signal a goroutine to stop its processing (in this case, stop sending tasks to the pool) is to use an "exit" channel, doing something like this:

func TestStop(t *testing.T) {
    pool := New(1, 30, MinWorkers(1), IdleTimeout(1*time.Second))

    // Simulate some users sending tasks to the pool
    quit := make(chan struct{})
    go func() {
        for i := 0; i < 30; i++ {
            select {
            case <-quit:
                // Quit signal received, stop sending tasks
                return
            default:
                pool.Submit(func() {
                    fmt.Println("do task")
                    time.Sleep(1 * time.Second)
                })
            }
            time.Sleep(1 * time.Second)
        }
    }()

    // Suppose the server is shut down after a period of time
    time.Sleep(5 * time.Second)

    // Send exit signal
    quit <- struct{}{}

    // Wait for all tasks to complete
    pool.StopAndWait()
}
zhu121 commented 2 years ago

Another way I think of is to modify the submit function like this:

func (p *WorkerPool) submit(task func(), canWaitForIdleWorker bool) (submitted bool) {
    defer func() {
        // Avoid exceptions in the external goroutine and judge the submitted results
        if r := recover(); r != nil {
            submitted = false
        }
    }()
    // other codes
}
alitto commented 2 years ago

I see. Agree that submitting a task to a closed worker pool should not panic if the call is made using TrySubmit() method, which should return false in this case. However, we should panic if the call is made using Submit() method, since this one is meant to block until the task is queued or panic otherwise. I'll look at it more closely when I have some time, but feel free to open a Pull request with the suggested changes in the meantime. Thanks!

alitto commented 2 years ago

Hey @zhu121! I finally had some time to work on this and opened a pull request (https://github.com/alitto/pond/pull/22) to improve handling of task submission when pool is being stopped. The latest release of pond (1.6.1) includes these changes. Please take a look when you get a chance and let me know if that's in line with your suggestion.

zhu121 commented 2 years ago

A small idea. Will adding recover() to the defer function of the submit() function reduce the mental burden of users. Because when calling the Trysubmit() function, you need to judge false to stop submitting the task, and when calling the Submit() function, you need to add recover() to stop submitting the task and ensure that you will not end abnormally

alitto commented 2 years ago

Adding recover() to the Submit() function and returning the err instead of panicking changes its signature from func Submit() to func Submit() error, which kind of breaks the current contract and could impact existing users of the library. The call to Submit() can only fail if the pool has been stopped manually by calling Stop() or StopAndWait(), so it's meant to be used when you are confident the pool will not be unexpectedly stopped. TrySubmit() is meant to be a non-blocking alternative to Submit, which allows the caller to decide what to do if the task was not accepted by the pool.

zhu121 commented 2 years ago

Modifying the function interface directly will indeed affect existing users. Thank you for your answer. I closed this problem first