gammazero / workerpool

Concurrency limiting goroutine pool
MIT License
1.33k stars 138 forks source link

Get response from all worker pulls #34

Closed JennyMet closed 4 years ago

JennyMet commented 4 years ago

Hi,

I want to run multiple requests and get one response (struct or something) that all done. I mean if I've 10 request and the longest took 10 sec i want after 10 seconds get the all the responses with id or something for each request to know who success and who has failed, is it possible ?

thanks

oze4 commented 4 years ago

I am not sure if this is the best approach but using closure/wrapping each "job" + custom "response type" solves this..

package main

import (
    "errors"
    "fmt"
    "time"

    "github.com/gammazero/workerpool"
)

// Response holds job response
type Response struct {
    ID    int
    Error error
    Data  interface{}
}

// wrapJob sends the result of our job to our results channel
func wrapJob(rc chan Response, f func() Response) func() {
    return func() { rc <- f() }
}

var jobs = []func() Response{
    func() Response {
        // do whatever
        // ...
        time.Sleep(time.Second * 3)
        return Response{ID: 1, Data: map[string]string{"hello": "mars"}}
    },
    func() Response {
        // do whatever
        // ...
        time.Sleep(time.Second)
        resp := Response{ID: 2}
        err := errors.New("some error") // simulate error
        if err != nil {
            resp.Error = err
            return resp
        }
        resp.Data = "some result that you may or may not see. depends if there is an error or not."
        return resp
    },
}

func main() {
    wp := workerpool.New(10)
    results := make(chan Response, len(jobs))

    for _, job := range jobs {
        wp.Submit(wrapJob(results, job))
    }

    wp.StopWait()

    close(results)

    for result := range results {
        if result.Error != nil {
            fmt.Printf("Found job with an error : ID '%d' : Error '%s'\n", result.ID, result.Error.Error())
        } else {
            fmt.Printf("Success : ID '%d' : Data '%+v'\n", result.ID, result.Data)
        }
    }
}

Which returns:

// Found job with an error : ID '2' : Error 'some error'
// Success : ID '1' : Data 'map[hello:mars]'
gammazero commented 4 years ago

@JennyMet Yes, that is certainly doable. A simple way to do what you want is to submit a function to workerpool that calls your job function with an id and a channel to put results on.

wp.Submit(func() {
    myFunc(id, resultChan)
})

After the all the jobs are submitted, you can then use select to wait on the result channel or until a timeout. Keep collecting results from the channel until all the results that you expect are collected, or until you timeout waiting.

timeout := time.After(10 * time.Second)
for i := 0; i < jobCount; i++ {
    select {
    case r := <-resultChan:
        handleResult(r)
    case <-timeout:
        return fmt.Errorf("timed out waiting for %d results", jobCount - i)
    } 
}

Here is some code that expands on the idea presented by @oze4 that shows a way to separate the generic mechanics of running and waiting for jobs into a separate package from your code that defines what a job does: https://gist.github.com/gammazero/bb543631f25a73d1a3f6c8e1a6970eb8

JennyMet commented 4 years ago

@gammazero @oze4 - Thank you!