zshuangyan / blog

我的个人博客
2 stars 0 forks source link

open-falcon源码学习(二):Agent从HBS同步内置监控项 #14

Open zshuangyan opened 6 years ago

zshuangyan commented 6 years ago

Agent端

入口:main函数中执行cron.SyncBuiltinMetrics() 代码位置: agent/cron/builtin.go

func SyncBuiltinMetrics() {
    if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
        go syncBuiltinMetrics()
    }
}

SyncBuiltinMetrics()确认配置文件中启用了心跳并且心跳地址不为空后,就开启了一个协程来执行syncBuiltinMetrics()函数。

func syncBuiltinMetrics() {
    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

    for {
        time.Sleep(duration)

        var ports = []int64{}
        var paths = []string{}
        var procs = make(map[string]map[int]string)
        var urls = make(map[string]string)

        hostname, _ := g.Hostname()

        req := model.AgentHeartbeatRequest{
            Hostname: hostname,
            Checksum: checksum,
        }

        var resp model.BuiltinMetricResponse
        g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)

        for _, metric := range resp.Metrics {

            if metric.Metric == g.URL_CHECK_HEALTH {
                arr := strings.Split(metric.Tags, ",")
                if len(arr) != 2 {
                    continue
                }
                url := strings.Split(arr[0], "=")
                if len(url) != 2 {
                    continue
                }
                stime := strings.Split(arr[1], "=")
                if len(stime) != 2 {
                    continue
                }
                if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
                    urls[url[1]] = stime[1]
                } else {
                    log.Println("metric ParseInt timeout failed:", err)
                }
            }
                        ...

            if metric.Metric == g.PROC_NUM {
                arr := strings.Split(metric.Tags, ",")

                tmpMap := make(map[int]string)

                for i := 0; i < len(arr); i++ {
                    if strings.HasPrefix(arr[i], "name=") {
                        tmpMap[1] = strings.TrimSpace(arr[i][5:])
                    } else if strings.HasPrefix(arr[i], "cmdline=") {
                        tmpMap[2] = strings.TrimSpace(arr[i][8:])
                    }
                }

                procs[metric.Tags] = tmpMap
            }
        }

        g.SetReportUrls(urls)
        g.SetReportPorts(ports)
        g.SetReportProcs(procs)
        g.SetDuPaths(paths)

    }
}

注意: 这里省略了一些差错校验逻辑(包括双方通信过程中的checksum)

每等待一个心跳时间,就执行一次同步操作,同步操作的流程为:

  1. 调用g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp),把HBS服务器返回的结果储存在resp中
  2. 遍历resp,分别提取出urls,ports,procs,paths等信息
  3. 通过g.SetXXX函数设置url,port,proc,path信息

HbsClient是g模块的包级变量,在main函数执行g.InitRpcClients()时被初始化,它是结构体SingleConnRpcClient实例的指针。

type SingleConnRpcClient struct {
    sync.Mutex
    rpcClient *rpc.Client
    RpcServer string
    Timeout   time.Duration
}

初始化:

func InitRpcClients() {
    if Config().Heartbeat.Enabled {
        HbsClient = &SingleConnRpcClient{
            RpcServer: Config().Heartbeat.Addr,
            Timeout:   time.Duration(Config().Heartbeat.Timeout) * time.Millisecond,
        }
    }
}

SingleConnRpcClient结构体有多个方法,包括Close,serverConn和Call:

func (this *SingleConnRpcClient) close() {
    if this.rpcClient != nil {
        this.rpcClient.Close()
        this.rpcClient = nil
    }
}

func (this *SingleConnRpcClient) serverConn() error {
    var err error
    var retry int = 1

    for {
        this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
        if err != nil {
            log.Printf("dial %s fail: %v", this.RpcServer, err)
            if retry > 3 {
                return err
            }
            time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
            retry++
            continue
        }
        return err
    }
}

可以看到agent和HBS服务器创建连接的过程中有重试机制(3次),通过调用github.com/toolkits/net包下的JsonRpcClient函数来实现和HBS服务器的连接。

func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {

    this.Lock()
    defer this.Unlock()

    err := this.serverConn()
    if err != nil {
        return err
    }

    timeout := time.Duration(10 * time.Second)
    done := make(chan error, 1)

    go func() {
        err := this.rpcClient.Call(method, args, reply)
        done <- err
    }()

    select {
    case <-time.After(timeout):
        log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
        this.close()
        return errors.New(this.RpcServer + " rpc call timeout")
    case err := <-done:
        if err != nil {
            this.close()
            return err
        }
    }

    return nil
}

g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)的具体执行流程如下:

  1. 加锁
  2. 创建和HBS服务器的连接
  3. 开启一个协程,执行this.rpcClient.Call("Agent.BuiltinMetrics", req, &resp)
  4. 如果在超时前HBS返回响应结果,则执行HbsClient.close()并返回响应结果给调用函数;否则,返回超时错误

HBS端

接下来我们看看HBS是怎样实现Agent.BuiltinMetrics函数的 源码位置:hbs/rpc/agent.go

func (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error {
    metrics, _ := cache.GetBuiltinMetrics(args.Hostname)

    checksum := ""
    if len(metrics) > 0 {
        checksum = DigestBuiltinMetrics(metrics)
    }

    if args.Checksum == checksum {
        reply.Metrics = []*model.BuiltinMetric{}
    } else {
        reply.Metrics = metrics
    }
    reply.Checksum = checksum
    reply.Timestamp = time.Now().Unix()

    return nil
}

注意: 这里省略了一些差错校验逻辑(包括双方通信过程中的checksum)

这里使用checksum来判断HBS端的metrics是否和agent端的metrics相同,如果相同则在reply.metrics中返回空切片,即不用更新。

接下来我们看看cache.GetBuiltinMetrics函数:

func GetBuiltinMetrics(hostname string) ([]*model.BuiltinMetric, error) {
    ret := []*model.BuiltinMetric{}
    hid, _ := HostMap.GetID(hostname)
    gids, _ := HostGroupsMap.GetGroupIds(hid)

    // 根据gids,获取绑定的所有tids
    tidSet := set.NewIntSet()
    for _, gid := range gids {
        tids, _ := GroupTemplates.GetTemplateIds(gid)
        for _, tid := range tids {
            tidSet.Add(tid)
        }
    }

    tidSlice := tidSet.ToSlice()

    // 继续寻找这些tid的ParentId
    allTpls := TemplateCache.GetMap()
    for _, tid := range tidSlice {
        pids := ParentIds(allTpls, tid)
        for _, pid := range pids {
            tidSet.Add(pid)
        }
    }

    // 终于得到了最终的tid列表
    tidSlice = tidSet.ToSlice()

    // 把tid列表用逗号拼接在一起
    count := len(tidSlice)
    tidStrArr := make([]string, count)
    for i := 0; i < count; i++ {
        tidStrArr[i] = strconv.Itoa(tidSlice[i])
    }

    return db.QueryBuiltinMetrics(strings.Join(tidStrArr, ","))
}

注意: 这里省略了一些差错校验逻辑

open-falcon通过在模板上配置监控项,并且通过主机组管理到模板来下发监控项,因此我们要逆向找到agent所在主机对应的主机组,再通过主机组找到所有关联的模板,最后通过模板找到监控项。

代码的流程很清晰,主要分为以下几步:

  1. 获取agent所在主机的id(hid)
  2. 获取主机管理的所有主机组id的列表(gids)
  3. 遍历每个主机组,把每个主机组管理的所有模板加入到集合tidSet中,最后把tidSet转化为tidSlice
  4. 遍历每个模板,把每个模板的母模板也加入到allTpls中,最后调用数据库接口查询所有模板包含的监控项的集合