alitto / pond

πŸ”˜ Minimalistic and High-performance goroutine worker pool written in Go
MIT License
1.5k stars 65 forks source link

Data race due to "Unsynchronized send and close operations" #70

Closed hongkuancn closed 1 month ago

hongkuancn commented 2 months ago

Hi!

I came across a data race with the test

func TestPoolWithCustomIdleTimeoutNew(t *testing.T) {

    pool := pond.New(10, 15, pond.IdleTimeout(100*time.Millisecond))

    for i := 0; i < 10; i++ {
        pool.Submit(func() {
            time.Sleep(10 * time.Millisecond)
        })
    }

    assertEqual(t, 10, pool.RunningWorkers())

    time.Sleep(200 * time.Millisecond)

    pool.StopAndWait()
}
➜  pond git:(main) βœ— go test -race -v -coverprofile=coverage.out -covermode=atomic -run TestPoolWithCustomIdleTimeoutNew
=== RUN   TestPoolWithCustomIdleTimeoutNew
==================
WARNING: DATA RACE
Write at 0x00c000128130 by goroutine 6:
  runtime.closechan()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/runtime/chan.go:357 +0x0
  github.com/alitto/pond.(*WorkerPool).stop.func1()
      /Users/whk/Documents/code/pond/pond.go:375 +0x56
  sync.(*Once).doSlow()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/sync/once.go:74 +0xf0
  sync.(*Once).Do()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/sync/once.go:65 +0x44
  github.com/alitto/pond.(*WorkerPool).stop()
      /Users/whk/Documents/code/pond/pond.go:374 +0x153
  github.com/alitto/pond.(*WorkerPool).StopAndWait()
      /Users/whk/Documents/code/pond/pond.go:334 +0x25b
  github.com/alitto/pond_test.TestPoolWithCustomIdleTimeoutNew()
      /Users/whk/Documents/code/pond/pond_blackbox_test.go:412 +0x1fb
  testing.tRunner()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1689 +0x21e
  testing.(*T).Run.gowrap1()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1742 +0x44

Previous read at 0x00c000128130 by goroutine 7:
  runtime.chansend()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/runtime/chan.go:160 +0x0
  github.com/alitto/pond.(*WorkerPool).maybeStopIdleWorker()
      /Users/whk/Documents/code/pond/pond.go:409 +0x355
  github.com/alitto/pond.(*WorkerPool).purge()
      /Users/whk/Documents/code/pond/pond.go:393 +0x356
  github.com/alitto/pond.New.gowrap1()
      /Users/whk/Documents/code/pond/pond.go:144 +0x33

Goroutine 6 (running) created at:
  testing.(*T).Run()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1742 +0x825
  testing.runTests.func1()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:2161 +0x85
  testing.tRunner()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1689 +0x21e
  testing.runTests()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:2159 +0x8be
  testing.(*M).Run()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:2027 +0xf17
  main.main()
      _testmain.go:159 +0x2e4

Goroutine 7 (finished) created at:
  github.com/alitto/pond.New()
      /Users/whk/Documents/code/pond/pond.go:144 +0x6ad
  github.com/alitto/pond_test.TestPoolWithCustomIdleTimeoutNew()
      /Users/whk/Documents/code/pond/pond_blackbox_test.go:380 +0xe4
  testing.tRunner()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1689 +0x21e
  testing.(*T).Run.gowrap1()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1742 +0x44
==================
    testing.go:1398: race detected during execution of test
--- FAIL: TestPoolWithCustomIdleTimeoutNew (0.20s)
FAIL

It seems the issue is related to purge() send nil task to the task channel to recycle the workers. Meanwhile, stop() close the task channel. The race detector will report the issue according to Unsynchronized send and close operations | Data Race Detector - The Go Programming Language

alitto commented 1 month ago

Hey @hongkuancn, thanks for opening this issue. It looks like this send inside maybeStopIdleWorker() should be skipped if the context has been canceled already. E.g.:

// maybeStopIdleWorker attempts to stop an idle worker by sending it a nil task
func (p *WorkerPool) maybeStopIdleWorker() bool {

    if decremented := p.decrementWorkerCount(); !decremented {
        return false
    }

    // If the pool context has been canceled the tasks channel could be closed, so sending a nil to it will panic
    select {
        case p.context.Done():
          return false
        default:
    }

    // Send a nil task to stop an idle worker
    p.tasks <- nil

    return true
}

What do you think? Feel free to open a pull request with these changes and I'll be glad to merge it.

hongkuancn commented 1 month ago

Hey @alitto ! Thanks for the suggestion. I noticed there might still be synchronization issue after your change

   stop               purge     
               β”‚                
               β”‚                
               β”‚    new select  
               │◄───────────────
 cancel contextβ”‚                
──────────────►│                
               β”‚                
               β”‚                
 close channel β”‚                
──────────────►│   task <- nil  
               │◄───────────────
               β”‚                
               β–Ό                

I just looked back the history and come up with an idea.

The issue is probably brought in by Pull Request #62. It wants to stop the pool with long running tasks when the context is canceled. However, when draining the tasks, it depends on a closed task channel. Worker goroutines are blocked because of this task channel. So the PR moves close(p.tasks) before p.workersWaitGroup.Wait() in stop() method. But this move can't guarantee the purge goroutine stopped, which leads to the data race.

So I suggest to move close(p.tasks) back and there is another unblocked way to drain the tasks instead of relying on a closed task channel.

func drainTasks(tasks <-chan func(), tasksWaitGroup *sync.WaitGroup) {
    for {
        select {
        case task, ok := <-tasks:
            if task != nil && ok {
                tasksWaitGroup.Done()
            }
        default:
            return
        }
    }
}

So after the change, the stop() method has the following steps:

  1. Wait the tasks done. (If context is canceled explicitly, long running tasks will be drained beforehand. Task waitgroup won't be blocked. All worker and purge goroutines are probably returned here, otherwise goroutines are returned in step 3)
  2. Reset worker count.
  3. Cancel the context.
  4. Wait all worker and purge goroutines done.
  5. Close the task channel.

What do you think? Please let me know if anything I missed out.

alitto commented 1 month ago

That makes a lot of sense, yes. I overlooked that change in https://github.com/alitto/pond/pull/62. As a general rule of thumb, a writable channel shared with N goroutines can only be closed after all of them have returned. I have merged both of your PRs and released them as part of v1.9.2 :rocket: Thank you for your contributions!