RussellLuo / timingwheel

Golang implementation of Hierarchical Timing Wheels.
MIT License
656 stars 121 forks source link

时间轮必现定时任务没有执行 #5

Closed histalk closed 5 years ago

histalk commented 5 years ago

type AtomicInt32 struct {
    int32
}

func NewAtomicInt32(n int32) AtomicInt32 {
    return AtomicInt32{n}
}

// Add atomically adds n to the value.
func (i *AtomicInt32) Add(n int32) int32 {
    return atomic.AddInt32(&i.int32, n)
}

var timeCnt = NewAtomicInt32(0)
var tw = timingwheel.NewTimingWheel(time.Millisecond*1, 3)

func init() {
    tw.Start()
}

func CycleTest() {
    tw.AfterFunc(6*time.Millisecond, func() {
        c := timeCnt.Add(1)

        //打印输出经常在 1k~10w内就终止
        //多次运行可以偶现,或一直挂机运行可以偶现
        fmt.Println(c)

        if c < 1000000 {
            CycleTest()
        }
    })
}

func TestTimeS(t *testing.T) {
    CycleTest()
    <-make(chan struct{})
}
youkale commented 5 years ago

碰到同样的问题,怀疑是不是延迟队列的问题。

RussellLuo commented 5 years ago

@histalk 我看你改成 必现 了,现象仍然是 “打印输出在 1k~10w 内就终止” 吗?我自己在 Mac 上试了下,至少可以执行到 30w+(还在执行中,等 100w 太久)。

@youkale 可以提供一下问题的必现条件吗?

RussellLuo commented 5 years ago

一、问题现象

确实看到报错信息了:

303916
303917
303918
303919
303920
303921
303922
303923
303924
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive, 32 minutes]:
main.main()
        /Users/russellluo/projects/go/src/github.com/RussellLuo/labs/try-timingwheel/main.go:47 +0x52

goroutine 5 [select]:
github.com/RussellLuo/timingwheel.(*delayQueue).Poll(0xc000086000, 0xc00001e240)
        /Users/russellluo/projects/go/src/github.com/RussellLuo/timingwheel/delayqueue.go:147 +0x3cf
github.com/RussellLuo/timingwheel.(*TimingWheel).Start.func1()
        /Users/russellluo/projects/go/src/github.com/RussellLuo/timingwheel/timingwheel.go:135 +0x37
github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap.func1(0xc00000e1e0, 0xc000088058)
        /Users/russellluo/projects/go/src/github.com/RussellLuo/timingwheel/utils.go:29 +0x27
created by github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap
        /Users/russellluo/projects/go/src/github.com/RussellLuo/timingwheel/utils.go:28 +0x62

goroutine 6 [select]:
github.com/RussellLuo/timingwheel.(*TimingWheel).Start.func2()
        /Users/russellluo/projects/go/src/github.com/RussellLuo/timingwheel/timingwheel.go:140 +0x11b
github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap.func1(0xc00000e1f0, 0xc000088058)
        /Users/russellluo/projects/go/src/github.com/RussellLuo/timingwheel/utils.go:29 +0x27
created by github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap
        /Users/russellluo/projects/go/src/github.com/RussellLuo/timingwheel/utils.go:28 +0x62
exit status 2

二、原因分析

经过分析,初步判断 delayQueue.OfferdelayQueue.Poll 之间存在并发问题。导致上述死锁问题的可能场景:

  1. [delayQueue.Poll] 执行 PeekAndShift 获取 item
  2. [delayQueue.Offer] 执行 heap.Push 添加 item
  3. [delayQueue.Offer] 判断 添加的 item 排在第一个,但是没有设置 rescheduling 标识,所以不会往 readyC 发送通知
  4. [delayQueue.Poll] 判断 没有 item,进而 设置 rescheduling 标识,并且 阻塞等待在 readyC 上

三、解决方案

只需要修改 delayQueue.Poll,保证 dq.pq.PeekAndShift(now)atomic.StoreInt32(&dq.sleeping, 1) 的整体原子性即可。

histalk commented 5 years ago

我暂时改成这样,大概会有更好的解决方法


// Offer inserts the bucket into the current queue.
func (dq *delayQueue) Offer(b *bucket) {
    item := &item{Value: b, Priority: b.Expiration()}

    dq.mu.Lock()
    heap.Push(&dq.pq, item)
    if item.Index == 0 {
        // A new item with the earliest expiration is added.
        if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
            dq.mu.Unlock()
            dq.wakeupC <- struct{}{}
            return
        }
    }
    dq.mu.Unlock()
}

// Poll starts an infinite loop, in which it continually waits for an bucket to
// expire and then send the expired bucket to the timing wheel via the channel C.
func (dq *delayQueue) Poll(exitC chan struct{}) {
    for {
        now := timeToMs(time.Now())

        dq.mu.Lock()
        item, delta := dq.pq.PeekAndShift(now)
        if item == nil {

            atomic.StoreInt32(&dq.sleeping, 1)
            dq.mu.Unlock()

            if delta == 0 {
                // Wait until a new item is added.
                select {
                case <-dq.wakeupC:
                    continue
                case <-exitC:
                    goto exit
                }

            } else if delta > 0 {

                select {
                case <-dq.wakeupC:
                    // A new item with an "earlier" expiration than the current "earliest" one is added.
                    continue
                case <-time.After(time.Duration(delta) * time.Millisecond):
                    // The current "earliest" item expires.

                    // Reset the sleeping state since there's no need to receive from wakeupC.
                    if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
                        // A caller of Offer() is being blocked on sending to wakeupC,
                        // drain wakeupC to unblock the caller.
                        <-dq.wakeupC
                    }
                    continue
                case <-exitC:
                    goto exit
                }

            }
        }
        dq.mu.Unlock()

        b := item.Value.(*bucket)
        select {
        case dq.C <- b:
            // Send the expired bucket to the timing wheel.
        case <-exitC:
            goto exit
        }
    }

exit:
// Reset the states
    atomic.StoreInt32(&dq.sleeping, 0)
}