cjuexuan / mynote

237 stars 34 forks source link

规则引擎中前缀树的应用 #49

Open cjuexuan opened 6 years ago

cjuexuan commented 6 years ago

背景

最近在重构告警的规则引擎,有一个这样的场景,我们的规则会描述为 domain#metric {tagKey1=tagVal1,tagKey2=tagVal2} func compareMethod threshold

比如 spoor.dashboard#jvm.mem.heap.used {endpoint=localhost} each > 100,olap.xqlserver_yarnAppid#cpu.load.avg {service=xql,idc-sh-bs} max[10m] >= 2 我们会按照用户描述的规则去构建一棵规则树

TimeSeries的结构如下:

case class TimeSeries(
  metric:     String,
  tags:       Seq[(String, String)],
  valueType:  Int,
  value:      Any,
  metricType: Int,
  timestamp:  Long,
  domain:     String
)

所以我们会先按照domain,metric和tags过滤数据,找到在规则树中对应的规则,然后用value更新规则

这对于一般的app应用没啥问题,比如这个服务就是用来做实时feed流的,那么他的domain自然而然就可以表述为recsys-feed-stream 但对于yarn上离线spark任务就有个问题,我们为了保证监控域的唯一性,将任务的domain设计为appName-appId,其中appName是有业务含义的, 比如这个应用用来跑spoor的数据备份的,那么appName会是spoor.DataBackup,每次跑的过程中yarn会分配一个appId,使得domain是动态的,这样规则描述的过程中,rule中的domain其实应该是domain的前缀

理解需求

刚才交代了下背景,那么我们的需求是希望一个ts过来之后迅速找到对应的规则,而且规则中的domain还是真实domain的最长前缀部分,这种场景下,我们需要兼顾两个方面,一方面是让能被命中规则的ts数据查找路径变短, 另一方面我们也要注意迅速过滤那些不在规则树中的数据,所以比较理想的是将domain部分构建成一颗前缀树,这样在查找的过程中效率会比较高效一点,另外由于现在规则的crud在spoor-judge-backend中,而规则引擎是一个spark streaming程序 ,也就是spoor-judge-stream,两者之间通过akka cluster进行通信,每次spoor-judge-backend遇到规则的变更会构建一棵新的ruleTree,然后发送给stream程序,所以这里我们就不需要考虑前缀树的更新这种case,暂时都是全量更新的

实现

定义Node

首先我们定义基础的数据结构,也就是Node

case class Node(children:TreeMap[Char,Node] = TreeMap.empty,char:Option[Char] = None)

实现查找

def find(tsDomain):Option[String] 

假设我们有一棵这样的Tree

trie

那么我们有以下测试用例

val node  = Node(???)
assert(node.find("ca") === Option("ca"))
assert(node.find("car") === Option("car"))
assert(node.find("cat") === Option("cat"))
assert(node.find("cat-1") === Option("cat"))
assert(node.find("cat-2") === Option("cat"))
assert(node.find("do") === Option("do"))
assert(node.find("dog-1") === Option("dog"))
assert(node.find("apple") === None)

也就是我们需要找出最匹配(最长)我们传入的tsDomain一个domainKey,如果完全不存在,就为None,其实这步查找之后我们会查询这个Node对应的domainKey下的metric set是否包含该ts的metric,不过这里简化了这个过程

那么我们就实现这个需求

我们的实现思路是首先确定递归终止的条件,递归终止的条件有下面两种:

  1. 当前的parent的children列表中不包含此时要查找的key
  2. 查找到tsDomain的最后一个char 接下来我们确定需要在递归中传递的参数,首先我们要将查找的index传下去,另外我们要将查找到的char的列表传下去,在递归终止的时候将这个列表mkString就是我们需要的,第三我们要将当前branch的Node传去,这样搜索的树会越来越小

具体代码如下

      def find(tsDomain: String): Option[String] = {
        @tailrec
        def findLoop(index: Int, chars: Seq[Char], currentParent: Node): Option[String] = {
          if (index == tsDomain.length - 1) {
            currentParent.children.get(tsDomain.charAt(index)).fold(if (chars.isEmpty) None else Some(chars.mkString)) { _ ⇒
              Some((chars :+ tsDomain.charAt(index)).mkString)
            }
          } else {
            currentParent.children.get(tsDomain.charAt(index)) match {
              case Some(parent) ⇒
                findLoop(index + 1, chars :+ tsDomain.charAt(index), parent)
              case None ⇒
                //finish find
                Some(chars.mkString)
            }
          }
        }
        findLoop(0, Seq.empty, this)
      }

实现树的构建

addTrie

比如我们要在已存在的Node中加入一个doctor的单词,那么首先我们要查找下d在不在,如果d不在就加一个children为空,char为Option('d')的Node 接着我们查看o在不在,重复上述步骤,直到单词词尾,在往下查找的过程中,我们需要将先从existsNodes中拿出头部,这是当前结点的parent结点, 构建currentNode,并且将currentNode放到Seq的头部,传给下一个char,直到全部构建完,这样我们就会有当前branch的一个Node的list,且子节点在父节点之前 接下来我们更新直接父节点的chilren map,这样最终将得到一棵新的Node

具体代码如下

      def add(key: String): Node = {
        def addChar(char: Char, node: Node): Node = {
          node.children.get(char) match {
            case None ⇒ node.copy(children = node.children + (char → Node(char = Some(char))))
            case _    ⇒ node
          }
        }
        (0 until key.length).foldLeft(Seq(this)) {
          case (nodes, i) ⇒
            val parent = nodes.head
            val currentChar = key(i)
            val currentNode = addChar(currentChar, parent).children(currentChar)
            currentNode +: nodes
        }.reduce { (childNode, parentNode) ⇒
          val char = childNode.char.get
          parentNode.copy(children = parentNode.children.updated(char, childNode))
        }
      }
cjuexuan commented 6 years ago

当然,这是一个比较直观的,其实Node还可以加一个word,表示当前的domainKey,这样的话,就可以不用传入Char的Seq,也会少一些对象的创建

cjuexuan commented 6 years ago

改进的代码

case class Node(children: TreeMap[Char, Node] = TreeMap.empty, word: Option[String] = None) {

      def find(tsDomain: String): Option[String] = {
        @tailrec
        def findLoop(index: Int, currentParent: Node): Option[String] = {
          if (index == tsDomain.length - 1) {
            currentParent.children.get(tsDomain.charAt(index)).fold(currentParent.word) { currentNode ⇒
              currentNode.word
            }
          } else {
            currentParent.children.get(tsDomain.charAt(index)) match {
              case Some(parent) ⇒
                findLoop(index + 1, parent)
              case None ⇒
                //finish find
                currentParent.word
            }
          }
        }
        findLoop(0, this)
      }

      def add(key: String): Node = {
        def addChar(currentChar: Char, parentNode: Node, currentWord: String): Node = {
          parentNode.children.get(currentChar) match {
            case None ⇒ parentNode.copy(children = parentNode.children + (currentChar → Node(word = Option(currentWord))))
            case _    ⇒ parentNode
          }
        }
        (0 until key.length).foldLeft(Seq(this)) {
          case (nodes, i) ⇒
            val parentNode = nodes.head
            val currentChar = key.charAt(i)
            val currentNode =
              addChar(currentChar, parentNode, key.substring(0, i + 1)).children(currentChar)
            currentNode +: nodes
        }.reduce { (childNode, parentNode) ⇒
          val char = childNode.word.get.last
          parentNode.copy(children = parentNode.children.updated(char, childNode))
        }
      }

    }