golang / go

The Go programming language
https://go.dev
BSD 3-Clause "New" or "Revised" License
123.97k stars 17.67k forks source link

x/time/rate: Limiter allows more configured with multiple goroutines #65508

Open ianzhang1988 opened 9 months ago

ianzhang1988 commented 9 months ago

Go version

go version go1.18.5 linux/amd64

Output of go env in your module/workspace:

GO111MODULE="on"
GOARCH="amd64"
GOBIN=""
GOCACHE="/root/.cache/go-build"
GOENV="/root/.config/go/env"
GOEXE=""
GOEXPERIMENT=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="linux"
GOINSECURE=""
GOMODCACHE="/root/.gvm/pkgsets/go1.18.5/global/pkg/mod"
GONOPROXY=""
GONOSUMDB=""
GOOS="linux"
GOPATH="/root/.gvm/pkgsets/go1.18.5/global"
GOPRIVATE=""
GOPROXY="https://goproxy.cn,direct"
GOROOT="/root/.gvm/gos/go1.18.5"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/root/.gvm/gos/go1.18.5/pkg/tool/linux_amd64"
GOVCS=""
GOVERSION="go1.18.5"
GCCGO="gccgo"
GOAMD64="v1"
AR="ar"
CC="gcc"
CXX="g++"
CGO_ENABLED="0"
GOMOD="/data/zhangyang/goproject/go.mod"
GOWORK=""
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -m64 -fmessage-length=0 -fdebug-prefix-map=/tmp/go-build3572669853=/tmp/go-build -gno-record-gcc-switches"

What did you do?

Here is my code

package main

import (
    "fmt"
    "time"
    . "golang.org/x/time/rate"
)

func Produce(ch chan uint64) {
    var counter uint64 = 0
    for {
        ch <- counter
        counter += 1
    }
}

func ConsumWithLimitDelay(ch chan uint64, chout chan uint64, lim *Limiter) {
    for {
        n := lim.Reserve()
        if !n.OK() {
            continue
        }

        time.Sleep(n.Delay())
        chout <- <-ch
    }
}

func Count(ch chan uint64) {
    var counter uint64 = 0

    lastTime := time.Now()

    for {
        _ = <-ch
        counter += 1
        du := time.Since(lastTime)
        if du > 1*time.Second {
            fmt.Printf("%d %v\n", counter, du)
            counter = 0
            lastTime = lastTime.Add(du)
        }
    }
}

func main() {
    ch := make(chan uint64, 1000)
    ch2 := make(chan uint64, 100)
    go Produce(ch)
    lim := NewLimiter(200000.0, 1000)
    for i := 0; i < 100; i++ {
        go ConsumWithLimitDelay(ch, ch2, lim)
    }
    Count(ch2)
}

limit is set to 200k, but actually is 300k on my machine.

here is what i think is going wrong in rate.go: sometimes lim.advance(t) would return a time t which is in the past, then update to lim.last and causing problem. (see // !!! comment in code below)

func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
    lim.mu.Lock()
    defer lim.mu.Unlock()

    if lim.limit == Inf {
        return Reservation{
            ok:        true,
            lim:       lim,
            tokens:    n,
            timeToAct: t,
        }
    } else if lim.limit == 0 {
        var ok bool
        if lim.burst >= n {
            ok = true
            lim.burst -= n
        }
        return Reservation{
            ok:        ok,
            lim:       lim,
            tokens:    lim.burst,
            timeToAct: t,
        }
    }

        // !!! with this block of code would set rate.go work correctly
    // if t.Before(lim.last) {
    //  return Reservation{
    //      ok:    false,
    //      lim:   lim,
    //      limit: lim.limit,
    //  }
    // }

        // !!! t could be in the past in multiple goroutine
    t, tokens := lim.advance(t)

    // Calculate the remaining number of tokens resulting from the request.
    tokens -= float64(n)

    // Calculate the wait duration
    var waitDuration time.Duration
    if tokens < 0 {
        waitDuration = lim.limit.durationFromTokens(-tokens)
    }

    // Decide result
    ok := n <= lim.burst && waitDuration <= maxFutureReserve

    // Prepare reservation
    r := Reservation{
        ok:    ok,
        lim:   lim,
        limit: lim.limit,
    }
    if ok {
        r.tokens = n
        r.timeToAct = t.Add(waitDuration)

                // !!! some time here would update last to a past time, causing the problem
        // Update state
        lim.last = t
        lim.tokens = tokens
        lim.lastEvent = r.timeToAct
    }

    return r
}

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
    last := lim.last
    if t.Before(last) {
        last = t
    }

    // Calculate the new number of tokens, due to time that passed.
    elapsed := t.Sub(last)
    delta := lim.limit.tokensFromDuration(elapsed)
    tokens := lim.tokens + delta
    if burst := float64(lim.burst); tokens > burst {
        tokens = burst
    }
    return t, tokens
}

What did you see happen?

limiter can't limit properly in multiple goroutine

What did you expect to see?

limiter limit properly in multiple goroutine

seankhliao commented 9 months ago

cc @Sajmani maybe?

it does appear that by passing a time before a lock is obtained to reserveN that drift can be an issue (so setting t = time.Now() in reserveN results in an accurate limit).

related #23145 ?

ianzhang1988 commented 8 months ago

Is there any update on this?

ianzhang1988 commented 8 months ago

I think the fix is simple as below.

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
    last := lim.last
    if t.Before(last) {
        // last = t
        t = last // here
    }

    // Calculate the new number of tokens, due to time that passed.
    elapsed := t.Sub(last)
    delta := lim.limit.tokensFromDuration(elapsed)
    tokens := lim.tokens + delta
    if burst := float64(lim.burst); tokens > burst {
        tokens = burst
    }
    return t, tokens
}

if t is in the past, advance should not produce any token. In the sense that token for this t is already produced in last call of advance.