gammazero / workerpool

Concurrency limiting goroutine pool
MIT License
1.33k stars 138 forks source link

How to get execution time of each job & run each job with a timeout? #35

Closed oze4 closed 4 years ago

oze4 commented 4 years ago

Putting this here for visibility...

Example of how to record "execution duration" + run each job with an "auto cancelling" timeout.

I understand it is not mint code, so please feel free to suggest changes, or supply your own improved version.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/gammazero/workerpool"
)

// JobResult holds a jobs result
type JobResult struct {
    // Public
    Data interface{}

    // Private
    err     error
    runtime time.Duration
    name    string
}

// Name returns name. It is written like this so the consumer
// cannot change the name outside of supplying one via the Job
func (jr *JobResult) Name() string {
    return jr.name
}

// Runtime returns job execution runtime
func (jr *JobResult) Runtime() time.Duration {
    return jr.runtime
}

// Error holds job errors, if any
func (jr *JobResult) Error() error {
    return jr.err
}

// SetError sets an error on our result
func (jr *JobResult) SetError(e error) {
    jr.err = e
}

// Job holds job data
type Job struct {
    Name string
    Task func() JobResult
}

func wrapJob(timeout time.Duration, resultsChan chan JobResult, job Job) func() {
    // Create our context with timeout per job
    timeoutContext, timeoutCancel := context.WithTimeout(context.Background(), timeout)

    return func() {
        timerStart := time.Now()
        // Start goroutine using our context, which contains our timeout.
        go func(ctx context.Context, done context.CancelFunc, resChan chan JobResult, todo Job, startTime time.Time) {
            // Get result from job
            result := todo.Task()

            // Set name & execution time after job completion
            result.runtime = time.Since(startTime)
            result.name = todo.Name

            // If the timeout has been hit then `timeoutContext.Err()`
            // will be != nil and we should not send it on our results chan.
            //
            // Without this check we would send this job twice due to the fact
            // we cannot cancel in-flight requests.
            //
            // Lets say we have a long running task, how would we cancel it
            // in-flight? Whether http request or simply running `time.Sleep(time.Hour*999999)`?
            //
            // Instead we just don't do anything with the return, hence this check.
            if timeoutContext.Err() == nil {
                resChan <- result
            }

            // Forcefully cancel our context.
            // Cancelling forcefully is not bad, essentially it means success
            done()
        }(timeoutContext, timeoutCancel, resultsChan, job, timerStart)

        select {
        // If our timeout is hit *or* cancelled forcefully, we wind up here...
        case <-timeoutContext.Done():
            // ...that is why we check for error
            switch timeoutContext.Err() {
            // ...specifically the timeout error.
            case context.DeadlineExceeded:
                // Place a result on our results channel that contains
                // an error, which we can check for later.
                resultsChan <- JobResult{
                    err:     context.DeadlineExceeded,
                    name:    job.Name,
                    runtime: time.Since(timerStart),
                }
            }
        }
    }
}

var jobs = []Job{{
    Name: "job1",
    Task: func() JobResult {
        // THIS JOB WILL ERROR ON PURPOSE
        // This will surpass our timeout and should get cancelled
        // ...you can do whatever you want in these jobs
        time.Sleep(time.Second * 3)
        // Don't have to set the name here
        return JobResult{Data: map[string]string{"Whatever": "You want"}}
    }}, {
    Name: "job2",
    Task: func() JobResult {
        // THIS JOB WILL SUCCEED
        time.Sleep(time.Millisecond * 300)
        resultFromCurl := "i am a result"
        return JobResult{Data: resultFromCurl}
    }},
}

func main() {
    // Set timeout here (or per job)
    jobTimeout := time.Duration(time.Second * 1) // 1 second

    // Create results channel with T type where T is whatever type you need
    jobResultsChannel := make(chan JobResult, len(jobs))

    // Create workerpool
    numWorkers := 10
    pool := workerpool.New(numWorkers)

    // Submit jobs to workerpool using our wrapper func
    for _, job := range jobs {
        pool.Submit(wrapJob(jobTimeout, jobResultsChannel, job))
    }

    // Wait for jobs to finish and close results channel
    pool.StopWait()
    close(jobResultsChannel)

    // Do whatever you want with results
    for jobResult := range jobResultsChannel {
        runTime := int(jobResult.Runtime() / time.Millisecond)
        str := "[%dms] : '%s' : JobSuccess : %s\n"
        data := jobResult.Data

        if jobResult.Error() != nil { // You should always check for errors
            str = "[%dms] : '%s' : JobError : %s\n"
            data = jobResult.Error()
        }

        fmt.Printf(str, runTime, jobResult.Name(), data)
    }
}
//// Output:
// [303ms] 'job2' : JobSuccess : i am a result
// [1001ms] 'job1' : JobError : context deadline exceeded