alitto / pond

🔘 Minimalistic and High-performance goroutine worker pool written in Go
MIT License
1.45k stars 64 forks source link

Handling Concurrency #12

Closed git-blame closed 3 years ago

git-blame commented 3 years ago

Dumb question: can tasks be submitted concurrently to a pool or do I need to use a mutex to control access to it?

I'm adding tasks concurrently. I'm also debugging by printing out the total tasks as well as the waiting tasks (which I call load) and the numbers are all over the place.

2021/07/06 15:34:46 Events.go:632: Event processing total: 119 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
git-blame commented 3 years ago

I'm not sure about the concurrency.

But the waiting tasks count seem to have an edge condition if it is already 0. The number "wraps" around with an integer over/underflow that matches my output.

from this code:

    // Decrement waiting task count
    atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))
package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    var a = ^uint64(0)
    var b uint64 = 1
    fmt.Println(a)
    fmt.Println(b)
    atomic.AddUint64(&b, a)
    fmt.Println(b)
    atomic.AddUint64(&b, a)
    fmt.Println(b)

}

yields:

18446744073709551615
1
0
18446744073709551615
alitto commented 3 years ago

Hey @git-blame,

can tasks be submitted concurrently to a pool or do I need to use a mutex to control access to it?

Yes, pools can be shared across goroutines without using a mutex

Regarding the WaitingTasks() method, it seems to be possible to see 18446744073709551615 for a brief period of time, but it should always go back to 0 eventually (unless I'm missing some edge case). Could you provide a minimal example of this behavior? I'd like to reproduce it to see if this can be improved.

Thanks for reaching out 🙂

git-blame commented 3 years ago

I'm just testing with processing a large number of concurrent POST requests. I'm trying to find a point in my system when the pool queue might be "too large" (e.g., would take X seconds to process) and switch to writing the requests to disk for later processing. That's why I'm checking WaitingTasks() value before submitting a task to see if it exceeds my threshold.

I guess my in-memory processing is sufficiently fast enough that there are rarely any tasks in the queue. Yet there are enough concurrent calls to WaitingTasks() such that some of them will see this temporary value.

alitto commented 3 years ago

I see. Yes, for that use case you need a reliable counter. I just submitted a fix for this particular scenario (See https://github.com/alitto/pond/pull/14/files) and ensure the waitingTasks counter never wraps around. What I'm doing there is essentially changing the order in which I increment and decrement the counter to guarantee increment always executes before decrement. The version that includes the fix is v1.5.1. Please try it out and let me know your thoughts. Thanks again!

git-blame commented 3 years ago

Great. No more underflow counts. Thanks.