alitto / pond

🔘 Minimalistic and High-performance goroutine worker pool written in Go
MIT License
1.43k stars 60 forks source link

Clear pending tasks in the worker when the context is canceled to avoid deadlocks in StopAndWait when tasks are queued for the worker. #62

Closed CorentinClabaut closed 2 months ago

CorentinClabaut commented 2 months ago

closes #61

alitto commented 2 months ago

Hey @CorentinClabaut, thanks for submitting this PR :raised_hands: I noticed the github actions workflow was specifying an unsopported version of go (1.15) so I pushed a fix to the master branch https://github.com/alitto/pond/commit/3f439a7c5bd41de75c44c36b815adfdf0139ea61 Do you mind rebasing this PR so that we can retry the failed pipelines :slightly_smiling_face:?

CorentinClabaut commented 2 months ago

Hey @alitto no problem for the PR :) I've just merged master, let me know if something else needs to be done.

alitto commented 2 months ago

It seems a test is failing due to a race condition 🤔 . Apparently, the race condition appears in this line

  github.com/alitto/pond.(*WorkerPool).stop.func1()
      /home/runner/work/pond/pond/pond.go:358 +0x44

I wonder if that's related to moving the closing of the tasks channel up. Will continue digging when i get a chance

CorentinClabaut commented 2 months ago

I just saw this error as well

=== RUN   TestSubmitWithContextCancelWithIdleTasks
    pond_blackbox_test.go:582: Expected int32(1) but was int32(2)
--- FAIL: TestSubmitWithContextCancelWithIdleTasks (0.00s)

This one seems to be due to

select {
        case <-context.Done():
                   ...
        case task, ok := <-tasks:
                   ...
        }

Here if both channels contain something any of the case could be triggered (https://stackoverflow.com/questions/46200343/force-priority-of-go-select-statement)

I can push a fix for this and see if it fixes the race condition as well.

CorentinClabaut commented 2 months ago

Hey @alitto it should be all good now. You were right, it was due to moving the closing of the tasks up

CorentinClabaut commented 2 months ago

Hey @alitto

The issue is now:

=== RUN   TestPurgeDuringSubmit
    pond_test.go:62: Expected int(1) but was int(0)
--- FAIL: TestPurgeDuringSubmit (0.00s)

I'm not sure how this issue could be triggered by this PR though.

Could it be that in some situations the purge might reset idleWorkerCount before we do the check in the test?

alitto commented 2 months ago

Hey @alitto

The issue is now:

=== RUN   TestPurgeDuringSubmit
    pond_test.go:62: Expected int(1) but was int(0)
--- FAIL: TestPurgeDuringSubmit (0.00s)

I'm not sure how this issue could be triggered by this PR though.

Could it be that in some situations the purge might reset idleWorkerCount before we do the check in the test?

Mhm, I think the idleWorkerCount counter might be slower to update when running in Github actions, I have the feeling i've seen this behavior before. Adding an extra sleep after submitting the first task should help i think:

// Submit a task to ensure at least 1 worker is started
pool.SubmitAndWait(func() {
    atomic.AddInt32(&doneCount, 1)
})

// Ensure idle worker count is updated
time.Sleep(1 * time.Millisecond)

assertEqual(t, 1, pool.IdleWorkers())
CorentinClabaut commented 2 months ago

Hey @alitto The issue is now:

=== RUN   TestPurgeDuringSubmit
    pond_test.go:62: Expected int(1) but was int(0)
--- FAIL: TestPurgeDuringSubmit (0.00s)

I'm not sure how this issue could be triggered by this PR though. Could it be that in some situations the purge might reset idleWorkerCount before we do the check in the test?

Mhm, I think the idleWorkerCount counter might be slower to update when running in Github actions, I have the feeling i've seen this behavior before. Adding an extra sleep after submitting the first task should help i think:

// Submit a task to ensure at least 1 worker is started
pool.SubmitAndWait(func() {
    atomic.AddInt32(&doneCount, 1)
})

// Ensure idle worker count is updated
time.Sleep(1 * time.Millisecond)

assertEqual(t, 1, pool.IdleWorkers())

That makes sense, I can see that it's what you have done in TestSubmitToIdle I'll add it here and in another test that seems to need it to make sure this issue doesn't reappear later

CorentinClabaut commented 2 months ago

Thank you for the merge @alitto :)