Open cjuexuan opened 7 years ago
最近主要在搞spark的监控系统,由于spark的metricSystem默认固定依赖了配置文件metrics.properties,所以开始用户一定要提供这个配置文件,这一点还是很恶心的,文件越多越容易出错,这点毋庸置疑,所以这次hack的初衷就是初始化的时候由我们自己的类指定,而非用文件的方式进行交互
metrics.properties
全类名org.apache.spark.metrics.MetricsConfig,这个类主要是将sparkConf和metric system进行桥接,以及为web ui之类的开启一些默认的sink(servlet)
org.apache.spark.metrics.MetricsConfig
这个类也比较简单,主入口就是initialize方法,
spark.metrics.conf
spark.metrics.conf.
*
此时思路也有了,很简单,就是把用户spark.metrics.conf这个key的内容读进来,追加到临时文件中,然后把我们那个很固定的指定sink的那一行也set进去,然后再把我们临时文件的绝对路径作为value设置进去,不过既然看了,我们也顺便把metric system这个类完整过一遍
全类名是org.apache.spark.metrics.MetricsSystem,进去会先去new个MetricsConfig,然后调用下metricsConfig的initialize方法
org.apache.spark.metrics.MetricsSystem
这个类里面还有些方法,主入口是start,stop和report方法
start method:
CodegenMetrics
HiveCatalogMetrics
MetricsSystem.SOURCE_REGEX
stop和report这两个方法没啥好说的,就是把sink关掉和调用sink的report方法,输出出去
def init(sparkConf: SparkConf): Unit = { val metricKey = "spark.metrics.conf" val file = File.createTempFile("metrics", "properties") val sinkName = classOf[FootprintSink].getName if (sparkConf.contains(s"$metricKey")) { val fileName = sparkConf.get(metricKey) logger.warn(s"found exists spark.metrics.conf:$fileName,append to new metrics.properties") writeResource2File(file, fileName) } else { writeResource2File(file, "metrics.properties") } FileUtils.writeStringToFile(file, s"*.sink.footprint.class=$sinkName") sparkConf.set(metricKey, file.getAbsolutePath) file.deleteOnExit() } private def writeResource2File(file: File, resourceName: String) = { val url = getClass.getResource(resourceName) if (url != null) { FileUtils.writeStringToFile(file, IOUtils.toString(url)) } }
这样用户对接的过程中就不依赖文件了,直接用我们的Util初始化一下就好了,简单了一点,也不容易出错了
更好的是文件也不生成,直接走spark的config
key的前缀为spark.metrics.conf.*.sink,就可以直接被metric system拿来反射你自己实现的sink
spark Metrics System hack
最近主要在搞spark的监控系统,由于spark的metricSystem默认固定依赖了配置文件
metrics.properties
,所以开始用户一定要提供这个配置文件,这一点还是很恶心的,文件越多越容易出错,这点毋庸置疑,所以这次hack的初衷就是初始化的时候由我们自己的类指定,而非用文件的方式进行交互MetricsConfig
全类名
org.apache.spark.metrics.MetricsConfig
,这个类主要是将sparkConf和metric system进行桥接,以及为web ui之类的开启一些默认的sink(servlet)这个类也比较简单,主入口就是initialize方法,
spark.metrics.conf
,value是metrics相关配置的文件名,如果没有就用默认的metrics.properties
作为文件名spark.metrics.conf.
开头的也设置进去,这也是spark内部系统中那些需要被监控的组件注入进去的入口,感兴趣可以设个断点看一下*
的存在,如果没key的都会放到*
这个key里面去,但如果这个指标还属于其他key,就移到其他key中去此时思路也有了,很简单,就是把用户
spark.metrics.conf
这个key的内容读进来,追加到临时文件中,然后把我们那个很固定的指定sink的那一行也set进去,然后再把我们临时文件的绝对路径作为value设置进去,不过既然看了,我们也顺便把metric system这个类完整过一遍spark metrics system
全类名是
org.apache.spark.metrics.MetricsSystem
,进去会先去new个MetricsConfig,然后调用下metricsConfig的initialize方法这个类里面还有些方法,主入口是start,stop和report方法
start method:
CodegenMetrics
和HiveCatalogMetrics
这两个MetricsSystem.SOURCE_REGEX
这个正则的挑选出来,通过class这个key拿到类名,然后调用反射将class初始化好,加到内部的ArrayBuffer里stop和report这两个方法没啥好说的,就是把sink关掉和调用sink的report方法,输出出去
最后代码
这样用户对接的过程中就不依赖文件了,直接用我们的Util初始化一下就好了,简单了一点,也不容易出错了