sourcegraph / conc

Better structured concurrency for go
https://about.sourcegraph.com/blog/building-conc-better-structured-concurrency-for-go
MIT License
9.04k stars 312 forks source link

Implement task groups and returning results #84

Open PROger4ever opened 1 year ago

PROger4ever commented 1 year ago

Sometimes we need to run a lot of tasks, grouped into small sets (less than goroutines count). The effective way is to run groups concurrently, not one-by-one (they are small). But the result should be grouped. Similar logic is implemented in [alito/pong](https://github.com/alitto/pond#:~:text=group%20%3A%3D%20pool.Group())

One of usage case is parsing of search results:

s := stream.New().WithMaxGoroutines(10)
groups := make([]*conc.Group, len(searchQueries), 0)

for query := searchQueries {
    g := s.group(query)
    groups = append(groups, g)

    for pageNumber := 1; pageNumber < 5; pageNumber++ {
        pageNumber := pageNumber
        g.Go(func() (res interface{}, err error) {
            res, err = getPageResults(query, pageNumber)
            return
        })
    }

    g.onFinish(printGroupResults)
}

func printGroupResults(g *conc.group, results []interface, err error) error {
    fmt.Printf("Results of search query %v: %v\n", g.GetId(), results)
    return nil
}

// or synchronous
for g := groups {
    results := g.GetResults(); // or g.Wait()
    _ = printGroupResults(g, results, nil)
}

Or how can I implement this logic an easy way with current functionality?

camdencheek commented 1 year ago

Have you seen ResultErrorPool? It looks like that will do a similar thing to the "synchronous" version at the bottom.

Alternatively, a task submitted to a stream returns a closure that will be executed in the order submitted, so something like the following should work:

s := stream.New().WithMaxGoroutines(10)
for query := range searchQueries {
    for pageNumber := 1; pageNumber < 5; pageNumber++ {
        pageNumber := pageNumber
        query := query
        s.Go(func() stream.Callback {
            res, err := getPageResults(query, pageNumber)
            return func() { printGroupResults(query, page, res, err) }
        })
    } 
}
s.Wait()
bobheadxi commented 1 year ago

I think the use case in the non-synchronous version is similar to what was available in lib/group's Limiter - we use a Limiter in sg to execute groups of work concurrently with a shared, global limit, abbreviated version:

    categoriesGroup = group.NewWithStreaming[error]()
    // checksLimiter is shared to limit all concurrent checks across categories.
    checksLimiter = group.NewBasicLimiter(r.Concurrency)

    // ...

    for i, category := range r.categories {
        // Run categories concurrently
        categoriesGroup.Go(func() error {
            // Run all checks for this category concurrently, with a shared
            // limit on how many checks can run together
            checksGroup := group.New().
                WithErrors().
                WithConcurrencyLimiter(checksLimiter)
            for _, check := range category.Checks {
                // run checks concurrently
                checksGroup.Go(func() error { ... })
            }

            return checksGroup.Wait()
        }, ...)
    }

This helps us show the user that all categories are running, while only r.Concurrency checks are running at the same time - it might be a bit of a funky way to do this, but it made sense to me when I wrote it 😅

bobheadxi commented 1 year ago

We might be able to do something similar by allowing users to specify a Pool to use (kind of like the ForEachIn(*pool.Pool, ...) idea that was floated):

type PoolLike interface {
   unwrap() Pool
}

func (p *Pool) InPool(pool PoolLike) *Pool {
   p.limiter = pool.unwrap().limiter
} 

In the current design, I think the above will "just work" for the most part, since nobody ever closes limiter, and deref() by default shares the limiter.

Usage might look like:

s := stream.New().WithMaxGoroutines(10)
for query := range searchQueries {
    s.Go(func() stream.Callback {
        // all groups, combined, can use up to the 10 goroutines in s
        g := pool.NewWithResults().InPool(s) 
        for pageNumber := 1; pageNumber < 5; pageNumber++ {
            pageNumber := pageNumber
            g.Go(func() (res interface{}, err error) {
                res, err = getPageResults(query, pageNumber)
                return
            })
        }
        groupResults, err := g.Wait()
        return func() {
            printGroupResults(...)
        }
    })
}
s.Wait()

The awkward bit in this example is that s.Go consumes a worker that could be used by g.Go, but maybe one could create a separate Pool that does nothing but act as a limiter.