golang / go

The Go programming language
https://go.dev
BSD 3-Clause "New" or "Revised" License
123.66k stars 17.62k forks source link

proposal: sync: add package sync/workerpool #53044

Open tniswong opened 2 years ago

tniswong commented 2 years ago

sync/workerpool

This package would provide a standardized concurrent worker pool implementation with a simple task interface.

package workerpool

type Task interface {
    Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context)
func (p WorkerPool) Wait()

Example

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
)

func main() {

    wp := workerpool.New(2)
    ctx, cancel := context.WithCancel(context.Background())

    go wp.Run(ctx) // runs until context is cancelled

    // wp.Push(Task 1)
    // wp.Push(Task 2)

    wp.Wait() // blocks until all pending tasks are complete, but does not stop workerpool goroutine
    cancel() // stops the workerpool

    // wait for the workerpool to be stopped
    select {
    case <-ctx.Done():
    }

}

Reasoning

While there are many overly simplistic examples published on the internet, the problem space gains difficulty quickly when trying to write a more robust custom implementation. I believe the community would benefit greatly by having such a robust implementation widely available in the standard library.

I've written github.com/tniswong/workerpool as a draft design that I offer up as a candidate implementation. This design uses golang.org/x/sync/semaphore for bounding the concurrent workers.

Design Notes

References

https://brandur.org/go-worker-pool https://gobyexample.com/worker-pools https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0 https://medium.com/code-chasm/go-concurrency-pattern-worker-pool-a437117025b1 https://golangbot.com/buffered-channels-worker-pools/

https://github.com/gammazero/workerpool https://github.com/alitto/pond https://github.com/cilium/workerpool https://github.com/vardius/worker-pool

ZekeLu commented 2 years ago

It looks like that https://pkg.go.dev/golang.org/x/sync/errgroup has most of the features described here. Have you considered improving https://pkg.go.dev/golang.org/x/sync/errgroup instead of adding a new one?

tniswong commented 2 years ago

Being honest, I didn't know that package existed. My searches related to the worker pool pattern never once surfaced sync/errgroup as an option.

While I admit there is definitely significant feature overlap upon first look, I'm not quite convinced these aren't two distinct use-cases.

By my (possibly naïve) understanding of worker pools, a pool simply (and continually) dispatches tasks to a set number of concurrent workers regardless of task outcome, and remains available to run future tasks until the pool is stopped. I believe that sync/errgroup (and I've only briefly studied this package so very possible I'm wrong here), on the other hand, will terminate upon encountering an error by any task, and only remains usable so long as no task has errored.

Assuming my observations are true, I can see distinct value provided by the approach of sync/errgroup and that of this proposal.

bcmills commented 2 years ago

The Run method makes this API prone to goroutine leaks and synchronization bugs: it is too easy to accidentally leak the Run goroutine, especially given that it doesn't come with a mechanism to way for that goroutine to finish. (For more detail, see my GopherCon '18 talk, particularly starting around slide 75.)

That leaves New, Push, and Wait, which are analogous to errgroup.Group's SetLimit (#27837), Go, and Wait respectively, but you are correct that errgroup specifically focuses on error aggregation (to facilitate fork/join-style concurrency) whereas the API proposed here intentionally does not (to facilitate reuse).

That makes it more similar to cmd/go/internal/par.Queue, which provides only NewQueue(maxActive int), Add(func()), and Idle() <-chan struct{}. I think that is the more appropriate API here — it has the same concurrency-limiting properties, but without the leak-prone Run method and with a somewhat more flexible way to select on completion.

tniswong commented 2 years ago

The Run method makes this API prone to goroutine leaks and synchronization bugs: it is too easy to accidentally leak the Run goroutine, especially given that it doesn't come with a mechanism to way for that goroutine to finish. (For more detail, see my GopherCon '18 talk, particularly starting around slide 75.)

This actually had crossed my mind, but I decided to punt pending some feedback. One thought was to add a func (p WorkerPool) WaitStop() to wait for the pool itself to stop gracefully. Also, this design doesn't require that Run(context.Context) be run as a goroutine, it can absolutely be called normally and will block until the context is cancelled. Tasks can also be added via Push() before the call to Run(context.Context) or from another goroutine, but I'm not sure that alleviates your concerns.

After learning about the existence sync/errgroup, I've been reading through it's history and came across your talk. Will be listening to it tonight. In the mean time, very interested in hearing other ideas on how to combat this.

That leaves New, Push, and Wait, which are analogous to errgroup.Group's SetLimit (#27837), Go, and Wait respectively, but you are correct that errgroup specifically focuses on error aggregation (to facilitate fork/join-style concurrency) whereas the API proposed here intentionally does not (to facilitate reuse).

That makes it more similar to cmd/go/internal/par.Queue, which provides only NewQueue(maxActive int), Add(func()), and Idle() <-chan struct{}. I think that is the more appropriate API here — it has the same concurrency-limiting properties, but without the leak-prone Run method and with a somewhat more flexible way to select on completion.

I'll need to do some studying of par.Queue to competently comment, but my understanding is that it is an internal package, thus not available for use. Are you suggesting making that API commonly available or using it as an influence to modify this proposal's defined API?

On first look, the ergonomics of par.Queue seem a bit more "clever" than I was going for. The WaitGroup-style Wait() feels very clear and natural.

Thanks for the feedback!

tniswong commented 1 year ago

I've finally had some time to revisit this and have incorporated feedback from @bcmills (thank you, btw):

sync/workerpool

package workerpool

type Task interface {
    Invoke(ctx context.Context) error
}

type WorkerPool struct {}

func New(n int64) WorkerPool
func (p *WorkerPool) Push(t Task, opts ...TaskOption)
func (p *WorkerPool) Run(ctx context.Context) <-chan struct{}

Example

package main

import (
    "context"
    "golang.org/x/sync/workerpool"
    "time"
)

func main() {

    pool := workerpool.New(2)
    ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
    defer cancel()

    done := wp.Run(ctx) // runs until context is cancelled

    // pool.Push(Task 1)
    // pool.Push(Task 2)

    // block until the workerpool is stopped and done channel is closed
    <-done

}

I have also updated my reference implementation:

https://pkg.go.dev/github.com/tniswong/workerpool/v2@v2.0.0 https://github.com/tniswong/workerpool/tree/master/v2

Merovius commented 1 month ago

I believe that sync/errgroup (and I've only briefly studied this package so very possible I'm wrong here), on the other hand, will terminate upon encountering an error by any task, and only remains usable so long as no task has errored.

If an errgroup.Group has no context.Context, it does not terminate on errors. And I'm not sure what you mean by "usable", but a quick test seems to suggests that it works fine.

So, while I agree that it isn't necessarily very visible, I still believe errgroup already provides what most people want from a "worker pool" and I have, whenever it came up, advocated its use for this purpose.

In regards to your proposal, I don't really like the API, personally.

Having the interface doesn't seem to really serve a purpose. It can just as well be a func(context.Context) error. Or even just func() error and close over the context.Context (I don't think the pool does anything with it?). Invoke(context.Context) error isn't really a natural method name for a type to have already, so there is also no benefit in that you can thus use this with existing types.

As it is, I also think the error return is problematic. Your proposal doesn't mention actual TaskOptions, so it is literally just ignored, but extrapolating from your implementation, it seems that it is only used for the retrying and otherwise dropped on the floor. In particular, there doesn't seem to be a way to even tell that a task has failed finally (when used with RetryMax). I think in general, it encourages naive and thus broken usage.

I also don't really like the functional options. It suggests that this will end up with a bit of a kitchen-sink problem. I don't think the standard library can reasonably predict anything a user might want to do here that could be encoded into an option. So we'll probably end up constantly getting proposals to add new TaskOptions to make one or another use case easier.

The thing I like about the errgroup API is really, that it is very simple, but also very flexible. As demonstrated by the fact that it can work as a worker pool as proposed here. Personally, I'd be more in favor of moving that into the stdlib sync to make it more visible and document this use case more loudly.

tniswong commented 1 month ago

@Merovius thank for the feedback. Let me clarify a few things:

If an errgroup.Group has no context.Context, it does not terminate on errors. And I'm not sure what you mean by "usable", but a quick test seems to suggests that it works fine.

You're correct here, at the time of writing that comment, my understanding of errgroup was limited, so my mistake.

I'm less concerned with the specifics of my proposed reference implementation, this one just happened to suit my needs at the time. I'm more interested in having a robust worker pool implementation in the stdlib that requires minimal boilerplate.

As the errgroup documentation states:

A Group is a collection of goroutines working on subtasks that are part of the same overall task.

This proposal is more aimed at processing a queue of an unknown number of unrelated tasks that are submitted asynchronously that each have independent results.

I can imagine scenarios where I want add rate limits to N tasks complete per sec, resubmit tasks that were queued but didn't run, retries, dynamically adjust concurrency and rate limits, so on and soforth. That said, I'm trying to start with a simpler implementation so it can be thoughtfully expanded.

Having the interface doesn't seem to really serve a purpose. It can just as well be a func(context.Context) error. Or even just func() error and close over the context.Context (I don't think the pool does anything with it?). Invoke(context.Context) error isn't really a natural method name for a type to have already, so there is also no benefit in that you can thus use this with existing types.

I don't really disagree w/ this, but I do find some marginal utility in the interface definition. Similar to http.Handler and http.HandlerFunc, I think it can go either way, or both ways, especially if I want to define a type to represent the task for whatever reason.

As it is, I also think the error return is problematic. Your proposal doesn't mention actual TaskOptions, so it is literally just ignored, but extrapolating from your implementation, it seems that it is only used for the retrying and otherwise dropped on the floor. In particular, there doesn't seem to be a way to even tell that a task has failed finally (when used with RetryMax). I think in general, it encourages naive and thus broken usage.

Yeah I think the error return on the Task is really just to facilitate the retry mechanic I provided. I could take it or leave it, I just found that mechanic useful for my intended use. Could very easily be omitted here.

Thanks again for the feedback, great thoughts.