sourcegraph / conc

Better structured concurrency for go
https://about.sourcegraph.com/blog/building-conc-better-structured-concurrency-for-go
MIT License
8.96k stars 307 forks source link

Implement goroutines state monitor? #85

Open PROger4ever opened 1 year ago

PROger4ever commented 1 year ago

Sometimes there are cases, when goroutines states should be printed in console. Similar functionality: hmdsefi/gowl

Usage proposal:

p := pool.New().WithMaxGoroutines(10).WithMonitor(monitor, time.Second)
p.Go(func(tp *threadParameter) (res interface{}, err error) {
    tp.setState("Doing work1...")
    err = work1()

    tp.setState("Doing work2...")
    res, err = work2()
    return
})

func monitor(ts []*threadState) error {
    return fmt.Fprintf(os.Stderr, "Thread states: %v", ts);
}
PROger4ever commented 1 year ago

Also, a job can be a struct with method Go() instead of func() (or even an interface with default struct-implementation). It makes us able to send any data to a goroutine easy

camdencheek commented 1 year ago

Hi @PROger4ever! Thanks for the idea

I'm not sure it would be possible to design this in a way that is general enough to be broadly useful and without adding significant surface area to the the API of the library. For example, what if you need to know something other than a string? Or what if you don't want to monitor ever second, but instead you want to print to stderr every time a SIGHUP is sent?

I think this can be implemented more generally outside of the library in a way that doesn't significantly constrain a user's choice of how to monitor. The monitoring hooks can be passed in to the closure, then control is left in the hands of the user.

func main() {
    p := pool.New()
    states := make([]atomic.Value, 10)
    for i := 0; i < 10; i++ {
        i := i
        p.Go(func() {
            states[i].Store("Doing work 1...")
            time.Sleep(time.Second)
            states[i].Store("Doing work 2...")
            time.Sleep(time.Second)
        })
    }

    monitorCtx, cancelMonitor := context.WithCancel(context.Background())
    monitorPool := pool.New().WithContext(monitorCtx)
    monitorPool.Go(func(ctx context.Context) error {
        for {
            select {
            case <-ctx.Done():
                return nil
            case <-time.After(100 * time.Millisecond):
                for i := range states {
                    fmt.Fprintf(os.Stderr, "%d: %s\n", i, states[i].Load())
                }
            }
        }
    })

    p.Wait()
    cancelMonitor()
    monitorPool.Wait()
}
camdencheek commented 1 year ago

Also, a job can be a struct with method Go() instead of func() (or even an interface with default struct-implementation)

You can already pretty easily do this just by passing the struct's method to the pool.Go() call.

type myTask struct {
    val int
}

func (t myTask) Run() {
    println(t.val)
}

func main() {
    p := pool.New()
    for i := 0; i < 10; i++ {
        task := myTask{i}
        p.Go(task.Run)
    }
    p.Wait()
}