wtysos11 / blogWiki

Use to store public paper and organize them.
17 stars 4 forks source link

golang压测框架vegeta的源码级使用 #209

Open wtysos11 opened 3 years ago

wtysos11 commented 3 years ago

Vegeta压测工具学习与使用

目标:

  1. 能够在命令行下使用Vegeta对指定API进行测试
  2. 了解如何导出结果,以及能获得什么样的结果(P99,P99.9,QPS)
  3. 探索能否导出其他结果,是否能够执行复杂命令或简易脚本等

时间比较紧迫,预计两到三个小时内完成

参考资料:

安装

在确定GOBIN在PATH内时,直接使用go get -u github.com/tsenart/vegeta即可完成安装。

简易使用

在iris服务器中开启一个简单的router

    app.Get("/test/{name}",func(ctx iris.Context){
        name := ctx.Params().Get("name")
        log.Println(name)
    })

声明一个简单的txt(必须带上HTTP)

GET http://localhost:8080/test/name1
GET http://localhost:8080/test/name2
GET http://localhost:8080/test/name3

执行命令vegeta attack -targets ./test.txt -duration=30s -rate=10000 > result.bin,即可开启服务,进行测试。(PS:不知道为什么-rate=0不支持,会报错误要求一个大于0的数)

执行vegeta report result.bin即可拿到执行报告

Requests      [total, rate]            300000, 9985.22
Duration      [total, attack, wait]    31.3856523s, 30.044419s, 1.3412333s
Latencies     [mean, 50, 95, 99, max]  1.367112453s, 4.50456ms, 4.511788108s, 6.305999861s, 7.6157462s
Bytes In      [total, mean]            0, 0.00
Bytes Out     [total, mean]            0, 0.00
Success       [ratio]                  48.38%
Status Codes  [code:count]             200:145132  0:154868
Error Set:
Get http://localhost:8080/test/name3: dial tcp 0.0.0.0:0->[::1]:8080: bind: An operation on a socket could not be performed because the system lacked su
fficient buffer space or because a queue was full.
Get http://localhost:8080/test/name1: dial tcp 0.0.0.0:0->[::1]:8080: bind: An operation on a socket could not be performed because the system lacked su
fficient buffer space or because a queue was full.
Get http://localhost:8080/test/name2: dial tcp 0.0.0.0:0->[::1]:8080: bind: An operation on a socket could not be performed because the system lacked su
fficient buffer space or because a queue was full.
Get http://localhost:8080/test/name1: dial tcp 0.0.0.0:0->[::1]:8080: connectex: Only one usage of each socket address (protocol/network address/port) i
s normally permitted.
Get http://localhost:8080/test/name2: dial tcp 0.0.0.0:0->[::1]:8080: connectex: Only one usage of each socket address (protocol/network address/port) i
s normally permitted.
Get http://localhost:8080/test/name3: dial tcp 0.0.0.0:0->[::1]:8080: connectex: Only one usage of each socket address (protocol/network address/port) i
s normally permitted.

此外还支持golang的库内部链接,例如:

package main

import (
    "fmt"
    "time"

    vegeta "github.com/tsenart/vegeta/v12/lib"
)

func main() {
    rate := vegeta.Rate{Freq: 100, Per: time.Second}
    duration := 4 * time.Second
    targeter := vegeta.NewStaticTargeter(vegeta.Target{
        Method: "GET",
        URL:    "http://localhost:8080/test",
    })
    //测试的实现需要基于Attacker
    attacker := vegeta.NewAttacker()

    var metrics vegeta.Metrics
    for res := range attacker.Attack(targeter, rate, duration, "Big Bang!") {
        //res是Result向量,执行结果。拿到的时候代表一次执行已经结束
        metrics.Add(res)
    }
    metrics.Close()

    fmt.Printf("99th percentile: %s\n", metrics.Latencies.P99)
}

从golang程序中启动Vegeta

由于需要输入不同的元素或者控制元素的数量,一个想法是直接生成一个百万行的文件并插入,另外一个想法是直接在程序内启动。出于对更灵活的性能的追求,我还是想尝试一下后者。

使用DEBUG单步调试,发现attack.Attack

根据issue中的回复可以看到,只要返回一个能够产生Targeter的函数即可。其中Targeter也是一个函数(类型为type Targeter func(*Target) error,而Target在我的理解中是即将发送的请求,原文为HTTP request blueprint

而例子中的atacker.Attack的返回值是一个channel,相当于是vegeta attack命令。其中attacker对象由NewAttacker创建,可以指定一些参数(比如最大并行数量等)

Attack分析

// Attack reads its Targets from the passed Targeter and attacks them at
// the rate specified by the Pacer. When the duration is zero the attack
// runs until Stop is called. Results are sent to the returned channel as soon
// as they arrive and will have their Attack field set to the given name.
func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <-chan *Result {
    var wg sync.WaitGroup

    //最大并发数的限制由Attacker提供
    workers := a.workers
    if workers > a.maxWorkers {
        workers = a.maxWorkers
    }

    //返回的结果队列
    results := make(chan *Result)
    ticks := make(chan struct{}) //ticks是控制速度用的,attack需要消费ticks中的数据才能继续执行
    for i := uint64(0); i < workers; i++ {
        wg.Add(1)//wait group
        go a.attack(tr, name, &wg, ticks, results)//使用go协程来控制。其中results被放入,在其中产生
    }

    go func() {
        //defer的实现上类似于栈,因此是先关闭ticks队列,再等待所有的a.attack函数执行完毕,最后关闭results队列
        defer close(results)
        defer wg.Wait()
        defer close(ticks)

        began, count := time.Now(), uint64(0)
        for {
            elapsed := time.Since(began)//拿到过去的时间
            if du > 0 && elapsed > du {//du即为duration,如果持续时间超过则直接结束
                return
            }

            wait, stop := p.Pace(elapsed, count)//Pacer,负责控制速度
            if stop {
                return //如果发完了,则返回
            }

            time.Sleep(wait)//等待剩余时间

            if workers < a.maxWorkers {//如果并发没有打满
                select {
                case ticks <- struct{}{}://向ticks中传入数据。由于ticks没有缓冲,所以如果其数据没有消耗掉则不能放入
                    count++
                    continue
                case <-a.stopch://接受到停止信号,直接中断
                    return
                default:
                    // all workers are blocked. start one more and try again
                    // 动态调整并发
                    workers++
                    wg.Add(1)
                    go a.attack(tr, name, &wg, ticks, results)
                }
            }

            select {
            case ticks <- struct{}{}:
                count++
            case <-a.stopch:
                return
            }
        }
    }()

    return results
}

attcker.attack

func (a *Attacker) attack(tr Targeter, name string, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) {
    defer workers.Done()//完成后返回,除非ticks被关闭不然也不会执行到这里。
    for range ticks {//每次要消费ticks的数据才能继续进行
        results <- a.hit(tr, name)//数据写入到results channel之中
    }
}
func (a *Attacker) hit(tr Targeter, name string) *Result {
    var (
        res = Result{Attack: name} //最终返回的结果
        tgt Target
        err error
    )

    a.seqmu.Lock()//加锁,保证对临街资源a.seq的访问与写入是正确的
    res.Timestamp = a.began.Add(time.Since(a.began))
    res.Seq = a.seq
    a.seq++
    a.seqmu.Unlock()//解锁

    defer func() {
        res.Latency = time.Since(res.Timestamp)
        if err != nil {
            res.Error = err.Error()
        }
    }()
    // 此处的tr就是传入的targeter的终点,这也解释了这个函数是干什么的
    // 传入的tgt实际上没有任何意义,看做是返回值会更好一些
    if err = tr(&tgt); err != nil {
        a.Stop()
        return &res
    }

    res.Method = tgt.Method
    res.URL = tgt.URL

    req, err := tgt.Request()
    if err != nil {
        return &res
    }

    if name != "" {
        req.Header.Set("X-Vegeta-Attack", name)
    }

    req.Header.Set("X-Vegeta-Seq", strconv.FormatUint(res.Seq, 10))

    if a.chunked {
        req.TransferEncoding = append(req.TransferEncoding, "chunked")
    }

    r, err := a.client.Do(req)
    if err != nil {
        return &res
    }
    defer r.Body.Close()

    body := io.Reader(r.Body)
    if a.maxBody >= 0 {
        body = io.LimitReader(r.Body, a.maxBody)
    }

    if res.Body, err = ioutil.ReadAll(body); err != nil {
        return &res
    } else if _, err = io.Copy(ioutil.Discard, r.Body); err != nil {
        return &res
    }

    res.BytesIn = uint64(len(res.Body))

    if req.ContentLength != -1 {
        res.BytesOut = uint64(req.ContentLength)
    }

    if res.Code = uint16(r.StatusCode); res.Code < 200 || res.Code >= 400 {
        res.Error = r.Status
    }

    res.Headers = r.Header

    return &res
}

读取文件设计动态访问

因此,动态访问的实现原理就很明确了:

  1. 实现一个能够返回targeter的函数,并将id作为参数传入其中,如同这个issue所写的那样,或者这个example code
  2. 如何让其中的内容不同:
    1. example code使用了随机数。在我的需求中,我可以将所需要的文件读入其中作为数组,然后使用随机数索引访问。
    2. 直接使用函数内变量,利用闭包的性质。考虑到targeter每次都会被调用(attack.go的第365行),因此计数器向上移动的时候就可以实现遍历。

下面做了一个小的demo来展示这个思想,服务端会接受一个/test/nameX作为接受变量,发送端会发送随机的/test/XXX过去。

服务端

package main

import (
    "github.com/kataras/iris/v12"
    prometheusMiddleware "github.com/iris-contrib/middleware/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "math/rand"
    "strconv"
    "time"

    "log"
)

func main() {
    app := iris.Default()
    registerCuckooFilter(app)
    registerPrometheus(app)
    app.Listen(":8080")
}

func registerCuckooFilter(app *iris.Application){
    filter := cuckooFilter.CuckooFilter{}
    test := make(map[string]int,0)
    app.Get("/test/{name}",func(ctx iris.Context){
        name := ctx.Params().Get("name")
        if _,ok := test[name];ok{
            test[name] ++
        }else{
            test[name] = 1
        }
        ctx.StatusCode(iris.StatusOK)
    })
    app.Get("/get/{name}",func(ctx iris.Context){
        name := ctx.Params().Get("name")
        if val,ok := test[name];ok{
            log.Printf("key number is %v",val)
        }else{
            log.Println("key doestn't exist")
        }
        ctx.StatusCode(iris.StatusOK)
    })
    //查询
    //batch操作
    //批量插入
    //批量查询
    //批量删除

}

func registerPrometheus(app *iris.Application){
    m := prometheusMiddleware.New("serviceName", 0.3, 1.2, 5.0)

    app.Use(m.ServeHTTP)

    app.OnErrorCode(iris.StatusNotFound, func(ctx iris.Context) {
        // error code handlers are not sharing the same middleware as other routes, so we have
        // to call them inside their body.
        m.ServeHTTP(ctx)

        ctx.Writef("Not Found")
    })

    app.Get("/", func(ctx iris.Context) {
        sleep := rand.Intn(4999) + 1
        time.Sleep(time.Duration(sleep) * time.Millisecond)
        ctx.Writef("Slept for %d milliseconds", sleep)
    })

    app.Get("/metrics", iris.FromStd(promhttp.Handler()))
}

压测端

package main

import (
    "bufio"
    "fmt"
    "io"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "time"

    vegeta "github.com/tsenart/vegeta/v12/lib"
)

func main() {
    rate := vegeta.Rate{Freq: 100, Per: time.Second}
    duration := 4 * time.Second
    targeter := NewCustomTargeter(vegeta.Target{
        Method: "GET",
        URL:    "http://localhost:8080/test",
    })
    //测试的实现需要基于Attacker
    attacker := vegeta.NewAttacker()

    var metrics vegeta.Metrics
    for res := range attacker.Attack(targeter, rate, duration, "random") {
        metrics.Add(res)
    }
    metrics.Close()

    fmt.Printf("99th percentile: %s\n", metrics.Latencies.P99)
}

func NewCustomTargeter(target vegeta.Target) vegeta.Targeter{
    //读取id文件
    //idData,err := readRealData()
    //if err != nil{
    //  panic(err)
    //}
    //cachedArray := make([]bool,len(idData))

    return func(tgt *vegeta.Target) error {
        //其中,tgt是作为指针传入的,是需要被修改的。后续HTTP请求的赋值都是来自tgt,所以看做是另外一个返回值会更好一些
        *tgt = target
        tgt.URL += "/custom"+strconv.Itoa(rand.Intn(100))
        //tgt.Header = http.Header{}
        return nil
    }
}

func readRealData() ([]string, error) {
    //读取id.txt文件
    filePath := "../../resources/id.txt"
    file, err := os.OpenFile(filePath, os.O_RDONLY, 0666)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    //数据格式为音频id,字符串`TRA_{albumid}_index`,albumid为6-10位数字,index为5位数字以内
    //将其作为字符串读取并写入到slices中
    idData := make([]string, 0, 1400000)
    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        idData = append(idData, line)
        if err != nil {
            if err == io.EOF {
                break
            } else {
                return nil, err
            }
        }
    }
    return idData, nil
}

之后,通过发送请求curl http://localhost:8080/get/custom54就可以查看触发的数量。

wu0h961738 commented 2 years ago

Hi 請問能否更詳細解答"读取文件设计动态访问"章節中的"计数器向上移动的时候就可以实现遍历",參考您的NewCustomTargeter範例還是沒讀明白T_T 疑問點是targeter如何撰寫才能有計數的效果,謝謝。

wtysos11 commented 2 years ago

Hi 請問能否更詳細解答"读取文件设计动态访问"章節中的"计数器向上移动的时候就可以实现遍历",參考您的NewCustomTargeter範例還是沒讀明白T_T 疑問點是targeter如何撰寫才能有計數的效果,謝謝。

抱歉,之前做的时候写的demo和这个不一致,这句话所对应的目标是如何遍历给定的txt文件夹。 我做了一个新的demo


服务端代码与之前一致

id.txt

TRA_1_index
TRA_2_index
TRA_3_index
TRA_4_index
TRA_5_index

压测端

package main

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "strings"
    "time"

    vegeta "github.com/tsenart/vegeta/v12/lib"
)

func main() {
    rate := vegeta.Rate{Freq: 100, Per: time.Second}
    duration := 4 * time.Second
    targeter := NewCustomTargeter(vegeta.Target{
        Method: "GET",
        URL:    "http://localhost:8080/test",
    })
    //测试的实现需要基于Attacker
    attacker := vegeta.NewAttacker()

    var metrics vegeta.Metrics
    for res := range attacker.Attack(targeter, rate, duration, "random") {
        metrics.Add(res)
    }
    metrics.Close()

    fmt.Printf("99th percentile: %s\n", metrics.Latencies.P99)
}

func NewCustomTargeter(target vegeta.Target) vegeta.Targeter {
    //读取id文件
    idData, err := readRealData()
    if err != nil {
        panic(err)
    }
    //cachedArray := make([]bool, len(idData))
    x := 0
    return func(tgt *vegeta.Target) error {
        //其中,tgt是作为指针传入的,是需要被修改的。后续HTTP请求的赋值都是来自tgt,所以看做是另外一个返回值会更好一些
        *tgt = target
        //tgt.URL += "/custom" + strconv.Itoa(rand.Intn(100))
        tgt.URL += "/" + idData[x]
        x++
        if x >= len(idData) {
            x = 0
        }
        //tgt.Header = http.Header{}
        return nil
    }
}

func readRealData() ([]string, error) {
    //读取id.txt文件
    filePath := "id.txt"
    file, err := os.OpenFile(filePath, os.O_RDONLY, 0666)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    //数据格式为音频id,字符串`TRA_{albumid}_index`,albumid为6-10位数字,index为5位数字以内
    //将其作为字符串读取并写入到slices中
    idData := make([]string, 0, 1400000)
    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        idData = append(idData, line)
        if err != nil {
            if err == io.EOF {
                break
            } else {
                return nil, err
            }
        }
    }
    return idData, nil
}

服务端部分的结果是

2022-06-07 16:55:49|14.132µs|200|GET|/test/TRA_1_index|::1|name=TRA_1_index|0 B|0 B||
2022-06-07 16:55:49|20.895µs|200|GET|/test/TRA_2_index|::1|name=TRA_2_index|0 B|0 B||
2022-06-07 16:55:49|20.918µs|200|GET|/test/TRA_3_index|::1|name=TRA_3_index|0 B|0 B||
2022-06-07 16:55:49|21.048µs|200|GET|/test/TRA_4_index|::1|name=TRA_4_index|0 B|0 B||
2022-06-07 16:55:49|12.286µs|200|GET|/test/TRA_5_index|::1|name=TRA_5_index|0 B|0 B||
2022-06-07 16:55:49|28.04µs|200|GET|/test/TRA_1_index|::1|name=TRA_1_index|0 B|0 B||
2022-06-07 16:55:49|49.178µs|200|GET|/test/TRA_2_index|::1|name=TRA_2_index|0 B|0 B||
2022-06-07 16:55:49|29.287µs|200|GET|/test/TRA_3_index|::1|name=TRA_3_index|0 B|0 B||
2022-06-07 16:55:49|45.1µs|200|GET|/test/TRA_4_index|::1|name=TRA_4_index|0 B|0 B||
2022-06-07 16:55:49|17.428µs|200|GET具体的|/test/TRA_5_index|::1|name=TRA_5_index|0 B|0 B||

这样就可以实现对id.txt中每一行数据的遍历,targeter范例在压测端的NewCustomTargeter,其中的x可以视为一个闭包实现的静态变量