gocraft / work

Process background jobs in Go
MIT License
2.4k stars 339 forks source link

Workers stuck #192

Open d1slike opened 3 years ago

d1slike commented 3 years ago

Context

Expected behavior

Corrent behavior

How to reproduce

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "time"

    "github.com/gocraft/work"
    "github.com/gomodule/redigo/redis"
)

var redisPool = &redis.Pool{
    MaxActive: 5,
    MaxIdle:   5,
    Wait:      true,
    Dial: func() (redis.Conn, error) {
        return redis.Dial("tcp", ":6379", redis.DialDatabase(3))
    },
}

const (
    namespace = "qwe"
    jobname = "asd"
    concurrency = 10
)

var enqueuer = work.NewEnqueuer(namespace, redisPool)

type ctx struct {
}

func (*ctx) do(_ *work.Job) error {
    fmt.Println("i'm alive", time.Now())
    time.Sleep(time.Millisecond * 5)
    _, err := enqueuer.Enqueue(jobname, nil)
    return err
}

func main() {
    for i := 0; i < concurrency * 2; i++ {
        _, err := enqueuer.Enqueue(jobname, nil)
        if err != nil {
            log.Fatal(err)
        }
    }
    work.
        NewWorkerPool(ctx{}, concurrency, namespace, redisPool).
        JobWithOptions(jobname, work.JobOptions{MaxConcurrency: 200}, (*ctx).do).
        Start()
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, os.Kill)
    <-signalChan
}
diff --git a/vendor/github.com/gocraft/work/dead_pool_reaper.go b/vendor/github.com/gocraft/work/dead_pool_reaper.go
index e930521e..4e1d4534 100644
--- a/vendor/github.com/gocraft/work/dead_pool_reaper.go
+++ b/vendor/github.com/gocraft/work/dead_pool_reaper.go
@@ -10,9 +10,9 @@ import (
 )

 const (
-       deadTime          = 10 * time.Second // 2 x heartbeat
-       reapPeriod        = 10 * time.Minute
-       reapJitterSecs    = 30
+       deadTime          = 1 * time.Second // 2 x heartbeat
+       reapPeriod        = 1 * time.Second
+       reapJitterSecs    = 1
        requeueKeysPerJob = 4
 )

Detailed description

Reapers

  1. There is a reaper goroutine that checks heartbeats if workers are alive every 30 seconds.
  2. if the heartbeat for a pool id found and not expired, go to the next pool id
  3. if the heartbeat is expired for the pool id id jobs from the pool in-progress moved to the queue
  4. after heartbeat removing or if no heartbeat found, the pool id is removed from worker_pools
  5. lock is adjusted by the number from lock_info for the pool id and it's removed from there and if lock is less than zero, the lock is set to zero

The problem

Here is a race condition

Possible Solution

imKarthikeyanK commented 1 year ago

Hi, This is very well reported. Thanks a lot.

Am facing the similar issue as well. I can see your PR was also not merged. Did you found any other workarounds to solve this?