alitto / pond

🔘 Minimalistic and High-performance goroutine worker pool written in Go
MIT License
1.46k stars 64 forks source link

`group.Wait()` doesn't wait for all started jobs to finish unlike `pool.StopAndWait()` #68

Open mikegleasonjr opened 1 month ago

mikegleasonjr commented 1 month ago

Given this test:

package something

import (
    "context"
    "errors"
    "testing"
    "time"

    "github.com/alitto/pond"
    "github.com/stretchr/testify/assert"
)

func TestStopAndWait(t *testing.T) {
    workers := 2

    jobsStarted := []bool{false, false, false, false}
    jobsEnded := []bool{false, false, false, false}
    jobsDuration := []time.Duration{51 * time.Millisecond, 100 * time.Millisecond, 51 * time.Millisecond, 50 * time.Millisecond}
    jobsReturnValue := []error{nil, errors.New("error"), nil, nil}

    pool := pond.New(workers, len(jobsEnded))
    group, _ := pool.GroupContext(context.Background())

    for i := 0; i < len(jobsStarted); i++ {
        group.Submit(func() error {
            t.Log("job started", i+1)
            jobsStarted[i] = true
            defer func() {
                t.Log("job ended", i+1)
                jobsEnded[i] = true
            }()
            time.Sleep(jobsDuration[i])
            return jobsReturnValue[i]
        })
    }

    // uncomment this to make the test pass:
    // pool.StopAndWait()
    // or
    // time.Sleep(100 * time.Millisecond)

    err := group.Wait()
    assert.Equal(t, []bool{true, true, true, false}, jobsStarted, "unexpected jobsStarted")
    assert.Equal(t, []bool{true, true, true, false}, jobsEnded, "unexpected jobsEnded")
    assert.EqualError(t, err, "error")
}

It has the following output:

=== RUN   TestStopAndWait
    bug_report_test.go:26: job started 2
    bug_report_test.go:26: job started 1
    bug_report_test.go:29: job ended 1
    bug_report_test.go:26: job started 3
    bug_report_test.go:29: job ended 2        <---- job 3 still running but test ended
    bug_report_test.go:44: 
            Error Trace:    bug_report_test.go:44
            Error:          Not equal: 
                            expected: []bool{true, true, true, false}
                            actual  : []bool{true, true, false, false}

                            Diff:
                            --- Expected
                            +++ Actual
                            @@ -3,3 +3,3 @@
                              (bool) true,
                            - (bool) true,
                            + (bool) false,
                              (bool) false
            Test:           TestStopAndWait
            Messages:       unexpected jobsEnded
--- FAIL: TestStopAndWait (0.10s)

If we uncomment pool.StopAndWait() or time.Sleep(100 * time.Millisecond), the test passes:

=== RUN   TestStopAndWait
    bug_report_test.go:26: job started 2
    bug_report_test.go:26: job started 1
    bug_report_test.go:29: job ended 1
    bug_report_test.go:26: job started 3
    bug_report_test.go:29: job ended 2
    bug_report_test.go:29: job ended 3       <---- we waited for job 3
--- PASS: TestStopAndWait (0.15s)
PASS

What should happen:

0 ms ------------------ 50 ms ------------------ 100 ms ------------------ 150 ms
job 1 starts
job 2 starts
                   job 1 finishes
                    job 3 starts
                                           job 2 errors out
                                     (global context gets cancelled)
                                                                      job 3 finishes
job 4 never starts
-------------------------------------------------------------------------------

What happens:

group.Wait() is not waiting for job 3.

Side effects:

If job 3 creates and returns resources that must be freed up (like an io.Closer), it causes a memory leak because the pool manager never knew it ended and returned something. It is not up to the worker to check if the pool stopped before returning its value to the pool and do cleanup.

What should happen:

group.Wait() should behave like pool.StopAndWait() (minus the stop)

Temporary fix:

    [...]
    pool := pond.New(workers, len(jobsEnded))
    group, _ := pool.GroupContext(context.Background())

    func() {
        defer pool.StopAndWait()

        for i := 0; i < len(jobsEnded); i++ {
            group.Submit(func() error {
                t.Log("job started", i+1)
                jobsStarted[i] = true
                defer func() {
                    t.Log("job ended", i+1)
                    jobsEnded[i] = true
                }()
                time.Sleep(jobsDuration[i])
                return jobsReturnValue[i]
            })
        }
    }()

    err := group.Wait()
    [...]
alitto commented 3 weeks ago

Hi @mikegleasonjr!

Yes, the behaviour you are describing is correct and it's by design. Unlike the StopAndWait() method on the worker pool, the context group's Wait() method waits until either one of the tasks exited with error or all tasks completed successfully (nil error). These two methods are meant to be used in different circumstances. Given that worker pool instances are meant to be reused across all goroutines (singleton), the StopAndWait() method is usually invoked when tearing down the application (e.g. after receiving the exit or kill signal). In this scenario, the goroutine that executes the shutdown procedure doesn't really expect to handle an error in one of the tasks that have been sent to the pool, all it does is wait for any pending tasks to complete (succesfully or not). This is why this method doesn't return an error object. The context group's Wait() method, on the other hand, is meant to be called after submitting a batch of tasks that are inter-related (e.g. uploading a collection of images within the handler of a single HTTP request) and that usually also means they share a context.Context. The semantics of this method are inspired by the errgroup package. When dealing with a bunch of tasks that are tied to the same context, you usually want to fail fast and return as soon as the first error is thrown. Moreover, if the group's context.Context object gets cancelled, then any pending task is not executed. That said, there's another kind of "group of tasks" that behaves in the way you described and that's the one created with pool.Group(). If you create the group using this method, then the Wait() method will wait for all tasks to complete, regardless of any error. However, this kind of task group doesn't share a context.Context object and task functions cannot return error (each task is expected to handle errors internally).