zshuangyan / blog

我的个人博客
2 stars 0 forks source link

open-falcon dashboard实现分析 #16

Open zshuangyan opened 6 years ago

zshuangyan commented 6 years ago

进入open-falcon的dashboard界面时,页面的左侧展示了目前纳入监控的主机的列表,默认只展示50条,可以通过主机名查找和主机关联标签查找的方法找到你关注的host列表: image

涉及到的API接口: URL: http://10.202.42.2:8081/api/endpoints?q=gs-server&tags=&limit=50&page=1&_r=0.6087057948380055 方法: GET 接口:http://ip:port/api/endpoints 参数:q=gs-server tags=空

对应的Python代码

@app.route("/api/endpoints")
def api_endpoints():
    ret = {
            "ok": False,
            "msg": "",
            "data": [],
            }

    q = request.args.get("q") or "."
    raw_tag = request.args.get("tags") or ""
    tags = ','.join(re.split('\s*,\s*', raw_tag))
    limit = int(request.args.get("limit") or 100)
    page = int(request.args.get("page") or 1)

    try:
        data = get_api_endpoints(q, tags, page, limit)
        ret['data'] = data
        ret['ok'] = True
        return json.dumps(ret)
    except Exception as e:
        abort(400, str(ret))

可以看到,api_endpoints这个函数从请求中提取出主机名的查找关键字,tags以及分页的信息后,调用了get_api_endpoints方法来获取endpoints(即主机)列表。

def get_api_endpoints(q, tags, page=1, limit=100):
    if not q and not tags:
        raise Exception("no query params given")

    h = {"Content-type": "application/json"}
    r = corelib.auth_requests("GET", config.API_ADDR + "/graph/endpoint?q=%s&limit=%d&page=%d&tags=%s" %(q, limit, page, tags), headers=h)
    if r.status_code != 200:
        raise Exception(r.text)

    j = sorted(r.json(), key=lambda x:x["endpoint"])

    return j

从这段代码中可以看出dashboard模块解析用户的请求后,调用了open-falcon的api接口/graph/endpoints来获取endpoints列表,dashboard没有对数据库的查询操作。

func EndpointRegexpQuery(c *gin.Context) {
       //省略了获取参数和部分校验工作
    ...

    labels := []string{}
    if inputs.Label != "" {
        labels = strings.Split(inputs.Label, ",")
    }
    qs := []string{}
    if inputs.Q != "" {
        qs = strings.Split(inputs.Q, " ")
    }

    var offset int = 0
    if inputs.Page > 1 {
        offset = (inputs.Page - 1) * inputs.Limit
    }

    var endpoint []m.Endpoint
    var endpoint_id []int
    var dt *gorm.DB
        //如果用户输入的tags标签不为空,需要通过endpoint_counter表进行关联查询
    if len(labels) != 0 {
        dt = db.Graph.Table("endpoint_counter").Select("distinct endpoint_id")
        for _, trem := range labels {
            dt = dt.Where(" counter like ? ", "%"+strings.TrimSpace(trem)+"%")
        }
        dt = dt.Limit(inputs.Limit).Offset(offset).Pluck("distinct endpoint_id", &endpoint_id)
        if dt.Error != nil {
            h.JSONR(c, http.StatusBadRequest, dt.Error)
            return
        }
    }
    if len(qs) != 0 {
               //如果用户输入的主机名查找关键字不为空,需要根据用户输入的关键字进行模糊匹配
        dt = db.Graph.Table("endpoint").
            Select("endpoint, id")
        if len(endpoint_id) != 0 {
            dt = dt.Where("id in (?)", endpoint_id)
        }

        for _, trem := range qs {
            dt = dt.Where(" endpoint regexp ? ", strings.TrimSpace(trem))
        }
        dt.Limit(inputs.Limit).Offset(offset).Scan(&endpoint)
    } else if len(endpoint_id) != 0 {
        dt = db.Graph.Table("endpoint").
            Select("endpoint, id").
            Where("id in (?)", endpoint_id).
            Scan(&endpoint)
    }
    if dt.Error != nil {
        h.JSONR(c, http.StatusBadRequest, dt.Error)
        return
    }

    endpoints := []map[string]interface{}{}
    for _, e := range endpoint {
        endpoints = append(endpoints, map[string]interface{}{"id": e.ID, "endpoint": e.Endpoint})
    }

    h.JSONR(c, endpoints)
}

选择主机,点击刷新counter列表,可以查询出这些主机,对应的接口为 URL:http://10.202.42.2:8081/api/counters 方法:POST 参数: _r 0.4806078226590633 eids ["349","1"] limit 50 page 1 q

对应的Python代码:

@app.route("/api/counters", methods=["POST"])
def api_get_counters():
    ret = {
            "ok": False,
            "msg": "",
            "data": [],
            }
    //省略了获取参数和部分校验工作
    ...

    h = {"Content-type": "application/json"}
    r = corelib.auth_requests("GET", config.API_ADDR + "/graph/endpoint_counter?eid=%s&metricQuery=%s&limit=%d&page=%d" %(",".join(eids), q, limit, page), headers=h)
    if r.status_code != 200:
        abort(400, r.text)
    j = r.json()

    counters_map = {}
    for x in j:
        counters_map[x['counter']] = [x['counter'], x['type'], x['step']]
    sorted_counters = sorted(counters_map.keys())
    sorted_values = [counters_map[x] for x in sorted_counters]

    ret['data'] = sorted_values
    ret['ok'] = True

    return json.dumps(ret)

dashboard模块依旧是调用了open-falcon的API接口获取监控项的列表,下面我们来看看这个接口的实现

func EndpointCounterRegexpQuery(c *gin.Context) {
        //省略了获取参数和部分校验工作
        ...

    if eid == "" {
        h.JSONR(c, http.StatusBadRequest, "eid is missing")
    } else {
        eids := utils.ConverIntStringToList(eid)
        if eids == "" {
            h.JSONR(c, http.StatusBadRequest, "input error, please check your input info.")
            return
        } else {
            eids = fmt.Sprintf("(%s)", eids)
        }

        var counters []m.EndpointCounter
        dt := db.Graph.Table("endpoint_counter").Select("endpoint_id, counter, step, type").Where(fmt.Sprintf("endpoint_id IN %s", eids))
        if metricQuery != "" {
            qs := strings.Split(metricQuery, " ")
            if len(qs) > 0 {
                for _, term := range qs {
                    dt = dt.Where("counter regexp ?", strings.TrimSpace(term))
                }
            }
        }
        dt = dt.Limit(limit).Offset(offset).Scan(&counters)
        if dt.Error != nil {
            h.JSONR(c, http.StatusBadRequest, dt.Error)
            return
        }

        countersResp := []interface{}{}
        for _, c := range counters {
            countersResp = append(countersResp, map[string]interface{}{
                "endpoint_id": c.EndpointID,
                "counter":     c.Counter,
                "step":        c.Step,
                "type":        c.Type,
            })
        }
        h.JSONR(c, countersResp)
    }
    return
}

可以看到/graph/endpoint_counter这个API返回了所有和选中的endpoint的关联的所有监控项的列表,列表中的每一项是map类型的,包含了监控项的类型,统计频率,名字,关联的主机id等信息。

点击页面右上角的查看图表的按钮,会链接到数据展示的界面 image

对应的接口为: URL:http://10.202.42.2:8081/chart 方法:POST 参数: _r 0.6320655494664266 counters[] […] 0 cpu.busy 1 cpu.guest 2 cpu.idle endpoints[] […] 0 gs-server-3238 1 gs-server-4171 graph_type k

对应的Python实现

@app.route("/chart", methods=["POST",])
def chart():
    endpoints = request.form.getlist("endpoints[]") or []
    counters = request.form.getlist("counters[]") or []
    graph_type = request.form.get("graph_type") or GRAPH_TYPE_HOST

    id_ = TmpGraph.add(endpoints, counters)
    ret = {
        "ok": False,
        "id": id_,
        "params": {
            "graph_type": graph_type,
        },
    }
    if id_: ret['ok'] = True

    return json.dumps(ret)

可以看到这个函数调用了TmpGraph.add(endpoints, counters)的方法,并得到了返回值id_,下面我们来看看TmpGraph.add方法具体做了哪些事情

    @classmethod
    def add(cls, endpoints, counters):
        d = {
            "endpoints": endpoints,
            "counters": counters,
        }
        h = {'Content-type': 'application/json'}
        r = corelib.auth_requests("POST", API_ADDR + "/dashboard/tmpgraph", headers=h, data=json.dumps(d))
        if r.status_code != 200:
            raise Exception(r.text)

        j = r.json()
        return j and j.get('id')

可以看到TmpGraph.add方法向open-falcon的/dashboard/tmpgraph这个api发送了请求,下面我们来看看 /dashboard/tmpgraph这个api的实现:

func DashboardTmpGraphCreate(c *gin.Context) {
         //省略了获取参数和部分校验工作
        ...
    sort.Strings(es)
    sort.Strings(cs)

    es_string := strings.Join(es, TMP_GRAPH_FILED_DELIMITER)
    cs_string := strings.Join(cs, TMP_GRAPH_FILED_DELIMITER)
    ck := cutils.Md5(es_string + ":" + cs_string)

    dt := db.Dashboard.Exec("insert ignore into `tmp_graph` (endpoints, counters, ck) values(?, ?, ?) on duplicate key update time_=?", es_string, cs_string, ck, time.Now())
    if dt.Error != nil {
        h.JSONR(c, badstatus, dt.Error)
        return
    }

    tmp_graph := m.DashboardTmpGraph{}
    dt = db.Dashboard.Table("tmp_graph").Where("ck=?", ck).First(&tmp_graph)
    if dt.Error != nil {
        h.JSONR(c, badstatus, dt.Error)
        return
    }

    h.JSONR(c, map[string]int{"id": int(tmp_graph.ID)})
}

分别把endpoints列表和counters列表拼成字符串,然后计算出相应的md5值,插入到tmp_graph中,得到对应的数据库的id,返回给前端,然后前端再利用这个id来发送下面的请求。

URL:http://10.202.42.2:8081/chart/a?cf=AVERAGE&end=&graph_type=a&id=69&start=&sum=off 方法:GET

对应Python的接口:

@app.route("/chart/a", methods=["GET"])
def multi_chart_data():
    if not g.id:
        abort(400, "no graph id given")

    //使用id来查询tmp_graph表中对应的counters和endpoints的信息
    j = TmpGraph.get(g.id)
    if not j:
        abort(400, "no such tmp_graph where id=%s" %g.id)

    counters = j.counters
    if not counters:
        abort(400, "no counters of %s" %g.id)
    counters = sorted(set(counters))

    endpoints = j.endpoints
    if not endpoints:
        abort(400, "no endpoints of %s, and tags:%s" %(g.id, g.tags))
    endpoints = sorted(set(endpoints))

    ret = {
        "units": "",
        "title": "",
        "series": []
    }

    //调用open-falcon的/graph/history的接口来获取监控数据
    query_result = graph_history(endpoints, counters, g.cf, g.start, g.end)

我们看一下open-falcon的/graph/history接口是怎样实现的:

func QueryGraphDrawData(c *gin.Context) {
         //省略了获取参数和部分校验工作
        ...
    respData := []*cmodel.GraphQueryResponse{}
    for _, host := range inputs.HostNames {
        for _, counter := range inputs.Counters {
            var step int
            if inputs.Step > 0 {
                step = inputs.Step
            } else {
                step, err = getCounterStep(host, counter)
                if err != nil {
                    continue
                }
            }
            data, _ := fetchData(host, counter, inputs.ConsolFun, inputs.StartTime, inputs.EndTime, step)
            respData = append(respData, data)
        }
    }
    h.JSONR(c, respData)
}

使用了两个for循环,针对每个endpoint的每个counter,找到监控项对应的采集频率,然后执行fetchData的操作,可以看到open-falcon的/graph/history这个API返回的是cmodel.GraphQueryResponse类型的指针的切片,接下来我们看看dashboard模块是怎样处理返回的数据的。

在Dashboard图表的界面上,我们可以看到支持endpoint(主机)视角,counter(监控项)视角和组合视角: image

对应的代码逻辑:

   //监控项视角
    if g.graph_type == GRAPH_TYPE_KEY:
        //多少个主机,就有多少个URL,最终展示多少个图
        for x in endpoints:
            id_ = TmpGraph.add([x], counters)
            if not id_:
                continue

            p["id"] = id_
            chart_ids.append(int(id_))
            src = "/chart/h?" + urllib.urlencode(p)
            chart_urls.append(src)
   //主机视角
    elif g.graph_type == GRAPH_TYPE_HOST:
        //多少个监控项,就有多少个URL,最终展示多少个图
        for x in counters:
            id_ = TmpGraph.add(endpoints, [x])
            if not id_:
                continue
            p["id"] = id_
            chart_ids.append(int(id_))
            src = "/chart/h?" + urllib.urlencode(p)
            chart_urls.append(src)
    else:
    //组合视角,只用展示一张图
        id_ = TmpGraph.add(endpoints, counters)
        if id_:
            p["id"] = id_
            chart_ids.append(int(id_))
            src = "/chart/a?" + urllib.urlencode(p)
            chart_urls.append(src)

    return render_template("chart/multi_ng.html", **locals())