AmitKumarDas / fun-with-programming

ABC - Always Be Coding
2 stars 2 forks source link

[go][channel] - parallel as well as memory bounded #1

Closed AmitKumarDas closed 2 years ago

AmitKumarDas commented 3 years ago

refer - https://blog.golang.org/pipelines

// producer
type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    c := make(chan result)  // unbuffered
    errc := make(chan error, 1) // buffered

    // with above channels in place
    // start a goroutine & MD5 files in parallel
    go func() {
        var wg sync.WaitGroup

        // func within a func
        // see nested use of wg
        err := filepath.Walk(root, func(path string, i os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !i.Mode().IsRegular() {
                return nil
            }

            // each file is processed in a goroutine
            // what if more memory than what is available [f5]
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled") // not an err to worry [??]
            default:
                return nil
            }
        })

        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait() // blocking
            close(c)
        }()

        // No select needed here, since errc is buffered.
        errc <- err
    }()

    return c, errc   // c is closeable
}
AmitKumarDas commented 3 years ago
// consumer
func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c { // loop till c closes
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil { // non blocking since errc is buffered
        return nil, err
    }
    return m, nil
}
AmitKumarDas commented 3 years ago
// bounded alternative
// keep goroutines under check
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string) // unbuffered
    errc := make(chan error, 1) // buffered

    // one goroutine to fill all possible paths
    go func() {
        // close the paths channel after the pipe is filled
        // here pipe implies the paths channel
        defer close(paths)

        // select not needed for this send
        // since errc is buffered
        walk := filepath.Walk
        errc <- walk(root, func(path string, i os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !i.Mode().IsRegular() {
                return nil
            }

            // blocking select
            select {
            case paths <- path:   // unbuffered
            case <-done:  // unbuffered
                return errors.New("walk canceled")
            }

            return nil
        })
    }()

    // return channels since the entire logic is within a goroutine
    return paths, errc
}
func MD5All(root string) (map[string][md5.Size]byte, error) {
    done := make(chan struct{})
    defer close(done)

    paths, errc := walkFiles(done, root)

    // start a fixed number of goroutines to read and digest files
    r := make(chan result)

    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)

    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, r)
            wg.Done()
        }()
    }

    go func() {
        wg.Wait()
        close(r)
    }()

    m := make(map[string][md5.Size]byte)
    // loop till result channel is closed
    for r := range r {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }

    // check whether the Walk failed
    // this is non blocking since errc is buffered
    if err := <-errc; err != nil {
        return nil, err
    }

    return m, nil
}
// digester is invoked from multiple goroutines
// len(paths) varies & reduces every time
// it is processed by a goroutine
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}