decred / dcrpool

decred mining pool
ISC License
29 stars 27 forks source link

pool: Don't potentially leak paymgr goroutine. #394

Closed davecgh closed 11 months ago

davecgh commented 12 months ago

This ensures the goroutine in the payment manager that handles sending rescan responses respects the context so it does not potentially leak the goroutine if the context is canceled prior to receiving a response.

It also corrects the channel close logic by moving into the goroutine. As mentioned in previous commits, receivers should never close channels, only senders once they are sure there are no more goroutines that can write to the channel.

davecgh commented 12 months ago

For reference, the following program illustrates why receivers should never close channels.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    const senderCloses = true // --------------- TODO: Change to false so the receiver closes the channel.

    start := time.Now()
    defer func() {
        fmt.Println("elapsed:", time.Since(start))
    }()

    // Create two independent contexts that finish at different times to clearly
    // illustrate what can happen when the receiver goroutine notices the
    // context is done first while the sender goroutine simultaneously selects
    // the send channel.
    ctx1, cancel1 := context.WithCancel(context.Background())
    ctx2, cancel2 := context.WithCancel(context.Background())
    time.AfterFunc(time.Millisecond*250, cancel1)
    time.AfterFunc(time.Millisecond*200, cancel2)

    // Create sender goroutine that uses ctx1 and receiver that uses ctx2.
    ch := make(chan int)
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        defer func() {
            if senderCloses {
                close(ch)
                fmt.Println("sender closed channel after", time.Since(start))
            }
        }()
        ticker := time.NewTicker(time.Millisecond * 100)
        defer ticker.Stop()
        var counter int
        for {
            select {
            case <-ctx1.Done():
                fmt.Println("sender done (via ticker wait) after", time.Since(start))
                return
            case <-ticker.C:
                counter++
                select {
                case <-ctx1.Done():
                    fmt.Println("sender done (via send wait) after", time.Since(start))
                    return
                case ch <- counter:
                    fmt.Printf("sending %d to channel\n", counter)
                }
            }
        }
    }()
    go func() {
        defer wg.Done()
        defer func() {
            if !senderCloses {
                close(ch)
                fmt.Println("receiver closed channel after", time.Since(start))
            }
        }()
        for {
            select {
            case <-ctx2.Done():
                fmt.Println("receiver done (via receive wait) after", time.Since(start))
                return
            case i, ok := <-ch:
                if !ok {
                    fmt.Println("receiver noticed sender closed channel after", time.Since(start))
                    fmt.Println("receiver done (via closed channel) after", time.Since(start))
                    return
                }
                fmt.Println("Received", i)
            }
        }
    }()
    wg.Wait()
}