zshuangyan / blog

我的个人博客
2 stars 0 forks source link

open-falcon源码学习:Agent模块定时执行采集任务 #13

Open zshuangyan opened 6 years ago

zshuangyan commented 6 years ago

Agent模块负责数据采集,它的代码结构如下: image

和其他编程语言一样,Go程序执行的入口也是main函数,open-falcon的main函数位于main.go文件中:

package main

/* 导入依赖模块,flag,os和fmt是标准库中的模块,另外几个模块是本地的包*/
import (
    "flag"
    "fmt"
    "github.com/open-falcon/falcon-plus/modules/agent/cron"
    "github.com/open-falcon/falcon-plus/modules/agent/funcs"
    "github.com/open-falcon/falcon-plus/modules/agent/g"
    "github.com/open-falcon/falcon-plus/modules/agent/http"
    "os"
)

func main() {
        // flag模块用来处理命令行,类似Python的argparse模块
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    check := flag.Bool("check", false, "check collector")

    flag.Parse()

    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }

    if *check {
        funcs.CheckCollector()
        os.Exit(0)
    }

    g.ParseConfig(*cfg)

    if g.Config().Debug {
        g.InitLog("debug")
    } else {
        g.InitLog("info")
    }

    g.InitRootDir()
    g.InitLocalIp()
    g.InitRpcClients()

    funcs.BuildMappers()

    go cron.InitDataHistory()

    cron.ReportAgentStatus()
    cron.SyncMinePlugins()
    cron.SyncBuiltinMetrics()
    cron.SyncTrustableIps()
    cron.Collect()

    go http.Start()

    select {}

}

可以看到,在main函数中调用了g,funcs,cron和http四个模块的函数,g模块主要用来做配置加载和初始化的工作,cron模块主要负责定时任务的调度,http模块负责接收http请求,funcs模块负责具体的采集工作。下面我们分模块讲解一下agent的工作机制。

定时执行采集任务

入口:main函数中的cron.Collect() 源码位置:agent/cron/collector.go

Collect 函数:

func Collect() {

    if !g.Config().Transfer.Enabled {
        return
    }

    if len(g.Config().Transfer.Addrs) == 0 {
        return
    }

    for _, v := range funcs.Mappers {
        go collect(int64(v.Interval), v.Fs)
    }
}

可以看到,Collect函数中对配置文件中Transfer的启用状态和地址进行校验后,就遍历了指标列表funcs.Mappers,然后对每个指标开启一个协程,执行collect函数。

Mappers是funcs的包级变量,在main函数调用funcs.BuildMappers()时被初始化,它是定义在funcs.go文件中的结构体FuncsAndInterval的切片,FuncsAndInterval结构体的定义如下:

type FuncsAndInterval struct {
    Fs       []func() []*model.MetricValue
    Interval int
}

可以看到,FuncsAndInterval结构体的第一个变量是【返回类型为【model.MetricValue结构体的指针的切片】的函数】的切片。我们来看下funcs.BuildMappers()函数:

func BuildMappers() {
    interval := g.Config().Transfer.Interval
    Mappers = []FuncsAndInterval{
        {
            Fs: []func() []*model.MetricValue{
                AgentMetrics,
                CpuMetrics,
                NetMetrics,
                KernelMetrics,
                LoadAvgMetrics,
                MemMetrics,
                DiskIOMetrics,
                IOStatsMetrics,
                NetstatMetrics,
                ProcMetrics,
                UdpMetrics,
            },
            Interval: interval,
        },
                ...
        {
            Fs: []func() []*model.MetricValue{
                GpuMetrics,
            },
            Interval: interval,
        },
    }
}

可以用下图来表示: image

分析完Mappers切片的结构后,我们来看看collect()函数:

func collect(sec int64, fns []func() []*model.MetricValue) {
    t := time.NewTicker(time.Second * time.Duration(sec))
    defer t.Stop()
    for {
        <-t.C

        hostname, err := g.Hostname()
        if err != nil {
            continue
        }

        mvs := []*model.MetricValue{}
        ignoreMetrics := g.Config().IgnoreMetrics

        for _, fn := range fns {
            items := fn()
            if items == nil {
                continue
            }

            if len(items) == 0 {
                continue
            }

            for _, mv := range items {
                if b, ok := ignoreMetrics[mv.Metric]; ok && b {
                    continue
                } else {
                    mvs = append(mvs, mv)
                }
            }
        }

        now := time.Now().Unix()
        for j := 0; j < len(mvs); j++ {
            mvs[j].Step = sec
            mvs[j].Endpoint = hostname
            mvs[j].Timestamp = now
        }

        g.SendToTransfer(mvs)

collect函数使用time模块实现了一个定时器(定时器的实现参考)每隔一定时间就执行一遍最外层的for循环。

在大括号的for循环里边还有一个嵌套循环,用来遍历执行fns的每个函数,以CpuMetrics函数为例:

func CpuMetrics() []*model.MetricValue {
    if !CpuPrepared() {
        return []*model.MetricValue{}
    }

    cpuIdleVal := CpuIdle()
    idle := GaugeValue("cpu.idle", cpuIdleVal)
    busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal)
    user := GaugeValue("cpu.user", CpuUser())
    nice := GaugeValue("cpu.nice", CpuNice())
    system := GaugeValue("cpu.system", CpuSystem())
    iowait := GaugeValue("cpu.iowait", CpuIowait())
    irq := GaugeValue("cpu.irq", CpuIrq())
    softirq := GaugeValue("cpu.softirq", CpuSoftIrq())
    steal := GaugeValue("cpu.steal", CpuSteal())
    guest := GaugeValue("cpu.guest", CpuGuest())
    switches := CounterValue("cpu.switches", CurrentCpuSwitches())
    return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}

每个函数的返回结果都是*Model.MetricValue类型的切片,通过使用一个内层的循环,把返回结果填入到mvs中。

注意:由于collect函数是以协程的方式执行的,因此Collect函数遍历处理Mappers的过程中不会出现相互阻塞或干扰。