Open cjuexuan opened 7 years ago
背景: spark streaming中计算的结果需要落盘到es,一个batch中的数据还会按照类型不同存储到不同的index中
现有框架:
elastic-hadoop这个项目在处理单一index的场景下还是比较容易上手的,但这个项目有两个不足,第一个是数据还要再次group后放到不同的index,另一个问题是这个项目用的是9200这个http端口
spark的官方文档中推荐了一种不错的方法,就是利用资源池,其实资源池最大的好处是对象池化,避免无谓的gc,那我们将我们的代码也改造下,我们可以选择用common-pool2实现一个我们sink的池化,在使用我们sink的时候从池中拿,不用的时候还回去就好,由于我们sink其实类似dao层,是对EsClient的wrapper,那么最好的方式就是实现java的Close接口,然后在销毁对象的时候将我们的esClient close
sink接口
trait TimeSeriesSink extends Closeable with Serializable { def write(ts: StructedTimeSeries) }
sink实现
import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri} import com.ximalaya.spoor.stream.bean.{GaugeTimeSeries, StructedTimeSeries} import com.ximalaya.spoor.stream.builder.IndexBuilder import com.ximalaya.spoor.stream.sink.TimeSeriesSink import org.elasticsearch.common.settings.Settings import com.ximalaya.spoor.common.bean.MetricType._ /** * @author todd.chen at 26/03/2017 08:01. * email : todd.chen@ximalaya.com */ class TimeSeriesSinkImpl(node: String, port: Int, clusterName: String, indexPrefix: String) extends TimeSeriesSink { private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass) logger.info("tsSinker init") private val settings = Settings.builder().put("cluster.name", clusterName).build() private val client = ElasticClient.transport(settings, ElasticsearchClientUri(node, port)) private val indexBuilder = new IndexBuilder(indexPrefix) override def write(ts: StructedTimeSeries): Unit = { ts match { case gauge: GaugeTimeSeries ⇒ client.execute { index into indexBuilder.getIndex(gauge.interval, Gauge) source gauge.toJson } case _ ⇒ //TODO other type sink } } override def close(): Unit = { client.close() } }
池化代码:
import com.ximalaya.spoor.stream.bean.EsPoolConfig import com.ximalaya.spoor.stream.sink.TimeSeriesSink import com.ximalaya.spoor.stream.sink.impl.TimeSeriesSinkImpl import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool} import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject} /** * @author todd.chen at 26/03/2017 15:08. * email : todd.chen@ximalaya.com */ class PooledSinkFactory extends BasePooledObjectFactory[TimeSeriesSink] with Serializable { private val esConf = EsPoolConfig.getEsConf private val clusterName = esConf.clusterName private val indexPrefix = esConf.indexPrefix private val node = esConf.node private val port = esConf.port override def create(): TimeSeriesSink = { new TimeSeriesSinkImpl(node, port, clusterName, indexPrefix) } override def wrap(t: TimeSeriesSink): PooledObject[TimeSeriesSink] = { new DefaultPooledObject[TimeSeriesSink](t) } override def destroyObject(p: PooledObject[TimeSeriesSink]): Unit = { p.getObject.close() super.destroyObject(p) } } object SinkPool extends GenericObjectPool[TimeSeriesSink](new PooledSinkFactory) with Serializable { private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass) private def initPool(): Unit = { val conf = EsPoolConfig.getEsConf logger.debug("es pool conf:{}", conf) this.setMaxIdle(conf.maxIdle) this.setMaxTotal(conf.maxConn) logger.info("init es Pool") } initPool() }
代码使用
dStream.mapWithState(mapWithStateFunc).foreachRDD { rdd ⇒ rdd.foreachPartition { structedTs ⇒ val sink = SinkPool.borrowObject() structedTs.flatten.foreach(sink.write) SinkPool.returnObject(sink) } }
spark项目中common pool的简单使用
背景: spark streaming中计算的结果需要落盘到es,一个batch中的数据还会按照类型不同存储到不同的index中
现有框架:
elastic-hadoop这个项目在处理单一index的场景下还是比较容易上手的,但这个项目有两个不足,第一个是数据还要再次group后放到不同的index,另一个问题是这个项目用的是9200这个http端口
改造
spark的官方文档中推荐了一种不错的方法,就是利用资源池,其实资源池最大的好处是对象池化,避免无谓的gc,那我们将我们的代码也改造下,我们可以选择用common-pool2实现一个我们sink的池化,在使用我们sink的时候从池中拿,不用的时候还回去就好,由于我们sink其实类似dao层,是对EsClient的wrapper,那么最好的方式就是实现java的Close接口,然后在销毁对象的时候将我们的esClient close
相关代码
sink接口
sink实现
池化代码:
代码使用