WeBankFinTech / WeDataSphere

WeDataSphere is a financial grade, one-stop big data platform suite.
661 stars 162 forks source link

【有奖征文】Linkis 新引擎实现分享 #11

Open wForget opened 4 years ago

wForget commented 4 years ago

Linkis 新引擎实现分享

在社区大佬的帮助下,我们完成了 0.11 版本的开发,实现了 ElasticSearch 和 Presto 引擎。具体的开发文档可以参考: Linkis引擎开发文档

执行引擎架构的选择

目前 Linkis 的架构可以分为两种,一种是 Entrance-EngineManger-Engine 的模式,一种是 Entrance 模式。统一执行服务的架构可以参考官方文档: Linkis-UJES设计文档

Entrance 服务作为执行的入口,主要负责任务的持久化工作,日志的输出,进行脚本的校验和变量替换,并与 Engine、EngineManager 服务交互,向可用的 Engine 发送执行任务的请求,或者向 EngineManager 发送启动 Engine 的请求。

EngineManager 服务主要负责 Engine 的启动,进行 Engine 请求资源的请求与释放,并持续监控 Engine 的状态。

Engine 服务负责任务的具体执行,包括了任务执行的一些初始化操作、任务脚本的切分、任务的执行、任务的进度监控和结果集的保存等工作。

Spark、Hive 引擎是 Entrance-EngineManger-Engine 模式实现,在这个模式中 Engine 作为 Spark 、Hive 任务的 Driver 端,向外暴露接口可持续的接受 Entrance 发来的请求,完成任务的执行。这个模式中不仅实现了多租户的任务隔离,还提供了单用户的引擎复用,尽量减少 Engine 的启动,大大提高了执行的效率。

上面的各个服务可以看到每个服务的职责非常的明确,不过多个服务也让整个的架构变的比较重,有一些轻量的执行没有必要通过 Entrance-EngineManger-Engine 模式进行实现。例如 Linkis JDBC 引擎的实现就是通过 Entrance 的模式。JDBC 引擎的职责就是作为 JDBC 连接的客户端向服务端发送请求,并进行连接的维护。JDBC 连接的维护是比较轻量级的,而且 JDBC 连接的复用也不是根据平台用户进行区分的,所以单独为每个用户启动一个引擎是没有必要的。

ElasticSearch 和 Presto 的客户端实际上就是 Http Client,所以 ElasticSearch 和 Presto 引擎的实现也应该是比较轻量的,最终我们实现的 ElasticSearch 和 Presto 引擎也是通过 Entrance 的模式实现的。

引擎资源控制

Linkis 的资源管理服务,用来管理用户、系统的资源和并发的控制,实现新的引擎需要考虑到引擎资源相关接口的实现。具体架构可参考:Linkis RM设计文档

Entrance-EngineManger-Engine 模式资源控制

Entrance-EngineManger-Engine 的模式,资源相关主要需要下面两个实现:

  1. EngineManger 注册资源 Linkis RM设计文档中可以看到,EngineManger 作为 Engine 资源的管理者,需要先向 ResourceManger 进行管理资源的注册。 Linkis 已经将 EngineManger 注册资源的逻辑进行了抽象,实现的时候只需要在 SpringConfiguration 中进行配置创建 resources 的 spring bean 对象,可以参考 SparkEngineManagerSpringConfiguration 的实现。
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineManagerSpringConfiguration

@Configuration
class SparkEngineManagerSpringConfiguration {

  @Bean(Array("resources"))
  def createResource(): ModuleInfo = {
    val totalResource = new DriverAndYarnResource(
      new LoadInstanceResource(ENGINE_MANAGER_MAX_MEMORY_AVAILABLE.getValue.toLong,
        ENGINE_MANAGER_MAX_CORES_AVAILABLE.getValue, ENGINE_MANAGER_MAX_CREATE_INSTANCES.getValue),
      null
    )

    val protectedResource = new DriverAndYarnResource(
      new LoadInstanceResource(ENGINE_MANAGER_PROTECTED_MEMORY.getValue.toLong, ENGINE_MANAGER_PROTECTED_CORES.getValue,
        ENGINE_MANAGER_PROTECTED_INSTANCES.getValue),
      null
    )

    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectedResource, ResourceRequestPolicy.DriverAndYarn)
  }
  // ...
}
  1. EngineResourceFactory 实现 EngineManager 创建 Engine 的时候需要先向 ResourceManger 去请求资源,所以新引擎需要提供 EngineResourceFactory 的实现,用来初始化创新 Engine 所需要的资源,再向 ResourceManger 进行请求。 Linkis 中提供了 AbstractEngineResourceFactory 的抽象,实现的时候只需要从 AbstractEngineResourceFactory 继承。具体可参考 SparkEngineResourceFactory 的实现:
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineResourceFactory

@Component("engineResourceFactory")
class SparkEngineResourceFactory extends AbstractEngineResourceFactory {

  override protected def getRequestResource(properties: java.util.Map[String, String]): DriverAndYarnResource = {
    val executorNum = DWC_SPARK_EXECUTOR_INSTANCES.getValue(properties)
    new DriverAndYarnResource(
      new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_DRIVER_MEMORY.getValue(properties) + "G"),
        DWC_SPARK_DRIVER_CORES,
        1),
      new YarnResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_EXECUTOR_MEMORY.getValue(properties) * executorNum + "G"),
        DWC_SPARK_EXECUTOR_CORES.getValue(properties) * executorNum,
        0,
        DWC_QUEUE_NAME.getValue(properties))
    )
  }
}

Entrance 模式并发控制

Lnkis 中将 Engine 的实例数作为资源的一种,目前用户请求的并发是通过 Engine 的实例数进行控制的,那么在 Entrance 的模式下,就没有很好的对用户的并发进行控制。

在 ElasticSearch 和 Presto 的实现中,我们参考了 EngineManager 的资源控制,将并发数作为资源的一种,在 Entrance 启动时进行模块资源注册。将每个执行作为一个实例,执行发生时先进行资源的请求和锁定,执行完成后进行资源的释放,从而达到用户并发的控制。

主要包括了以下步骤:

  1. Entrance 注册并发资源 Entrance 注册并发资源,需要创建资源实例,将并发作为资源的一部分,然后配合 @EnableResourceManager 和 @RegisterResource 注解进行资源注册。
  // 定义资源
  @Bean(Array("resources"))
  def createResource(): ModuleInfo = {
    // 创建并发资源实例,分为总资源和受保护的资源
    val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
    val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
    info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
  }

  // 注册资源
  @RegisterResource
  def registerResources(): ModuleInfo = resources
  1. 执行前请求锁定资源 执行实例初始化前,先通过 ResourceManagerClient#requestResource 方法请求锁定并发实例资源。
rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
  case NotEnoughResource(reason) =>
    // 没有请求到资源,抛出异常
    throw EsEngineException(LogUtils.generateWarn(reason))
  case AvailableResource(ticketId) => {
    // 请求到资源,创建执行实例,并保存 ticketId 用于释放资源
    // ...
    // 当资源被实例化后,返回实际占用的资源总量
    rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
  }
}
  1. 执行完成释放资源 执行完成后销毁执行实例,并通过 ResourceManagerClient#resourceReleased 方法释放锁定的资源。
// 使用 ticketId 释放对应的资源
rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))

ElasticSearch 引擎的实现

下面是微众王和平大佬帮忙画的 ElasticSearch 引擎整体的架构图:

ElasticSearch引擎架构图

Linkis 新引擎的实现还是比较容易的,ElasticSearch 引擎的代码结构如下,整体的代码量也是比较少。主要包括了资源的配置、执行器的实例化和ElasticSearch请求与结果解析的相关代码。

Es引擎代码结构

  1. 资源注册 ElasticSearch 引擎需要考虑到用户请求的并发和 Entrance 整体并发的控制。 Entrance 启动时,需要对 Entrance 可用资源进行注册,主要包括了最大实例数和保护的阈值。在 EsSpringConfiguration 中生成资源的 bean 对象,并传入 EsEngineManager 进行注册,配置 @EnableResourceManager 和 @RegisterResource 就会自动进行注册。
// com.webank.wedatasphere.linkis.entrance.conf.EsSpringConfiguration
class EsSpringConfiguration extends Logging{

  @Bean(Array("resources"))
  def createResource(@Autowired rmClient: ResourceManagerClient): ModuleInfo = {
    // Clean up resources before creating resources to prevent dirty data when exiting abnormally (创造资源之前进行资源清理,防止异常退出时产生了脏数据)
    Utils.tryQuietly(rmClient.unregister())
    Utils.addShutdownHook({
      info("rmClient shutdown, unregister resource...")
      rmClient.unregister
    })
    val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
    val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
    info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
  }

}

// com.webank.wedatasphere.linkis.entrance.execute.EsEngineManager
@EnableResourceManager
class EsEngineManager(resources: ModuleInfo) extends EngineManager with Logging {

  @RegisterResource
  def registerResources(): ModuleInfo = resources

}
  1. 请求执行器 EsEngineRequester 启动一个执行器,用于任务的执行,通过 request 方法对传入的 job 生成一个执行的 EsEntranceEngine,请求时先向 ResourceManager 请求并锁定一个实例的资源,在 EsEntranceEngine 执行结束后会进行释放。
// com.webank.wedatasphere.linkis.entrance.execute.EsEngineRequester
class EsEngineRequester(groupFactory: GroupFactory, rmClient: ResourceManagerClient) extends EngineRequester {
  override def request(job: Job): Option[EntranceEngine] = job match {
    case entranceJob: EntranceJob => {
      val requestEngine = createRequestEngine(job);
      // request resource manager
      rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
        case NotEnoughResource(reason) =>
          throw EsEngineException(LogUtils.generateWarn(reason))
        case AvailableResource(ticketId) => {
          val engine = new EsEntranceEngine(idGenerator.incrementAndGet(), new util.HashMap[String, String](requestEngine.properties)
            , () => {rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))})
          engine.setGroup(groupFactory.getOrCreateGroup(getGroupName(requestEngine.creator, requestEngine.user)))
          engine.setUser(requestEngine.user)
          engine.setCreator(requestEngine.creator)
//          engine.updateState(ExecutorState.Starting, ExecutorState.Idle, null, null)
          engine.setJob(entranceJob)
          engine.init()
          executorListener.foreach(_.onExecutorCreated(engine))
          rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
          Option(engine)
        }
      }
    }
    case _ => None
  }
}
// com.webank.wedatasphere.linkis.entrance.execute.EsEntranceEngine
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
  override def close(): Unit = {
    try {
      this.job.setResultSize(0)
      this.engineExecutor.close
      // 释放资源
      resourceRelease()
      // ......
}
  1. 任务执行 EsEntranceEngine 是 com.webank.wedatasphere.linkis.entrance.execute.EntranceEngine 的实现,进行脚本的执行。在这里抽出一层 EsEngineExecutor 作为 Es 任务的具体执行。EsEntranceEngine 则负责 EsEngineExecutor 的初始化、脚本解析切分等实现。
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
  private var engineExecutor: EsEngineExecutor = _
  // ...
  override def execute(executeRequest: ExecuteRequest): ExecuteResponse   // ...
  protected def executeLine(code: String): ExecuteResponse = this.engineExecutor.executeLine(code, storePath, s"_$codeLine")

}
  1. ElasticSearch 脚本执行 entrance.executor 包中就是 ElasticSearch 客户端的封装、请求的封装和结果的解析等相关代码。 ElasticSearch 客户端封装在 EsClient 中,通过 EsClientFactory 进行实例化,并将 datasourceName 作为唯一 Key 进行缓存。 EsEngineExecutorImpl 是 EsEngineExecutor 的实现,用于任务的执行。 ResponseHandlerImpl 用于结果的处理,会根据 ElasticSearch 的返回类型进行反序列化,并保存为 Linkis 的 ResultSet。

DataSource 路由

在与微众大佬的讨论交流中得知后面 Linkis 的架构将会引入 DataSource 的概念,DataSource 模块维护引擎的连接信息和集群等信息,可以减少一些数据源运行配置,方便数据源配置和权限管理,为数据平台提供元数据信息,并可根据 DataSource 进行路由实现多集群的路由。

在 Linkis-0.11.0版本中添加了 linkis-gateway-ujes-datasource-ruler 模块,作为一个 Gateway 插件的形式简单实现了,请求和 Entrance 的路由。

linkis-gateway-ujes-datasource-ruler 模块的实现

抽象出 EntranceGatewayRouterRuler 接口用于执行路由规则,在 Gateway 模块的 EntranceGatewayRouter 中注入 EntranceGatewayRouterRuler 实例。

@Component
class EntranceGatewayRouter extends AbstractGatewayRouter {

  @Autowired(required = false)
  private var rules: Array[EntranceGatewayRouterRuler] = _

  override def route(gatewayContext: GatewayContext): ServiceInstance = {
    gatewayContext.getGatewayRoute.getRequestURI match {
      case EntranceGatewayRouter.ENTRANCE_REGEX(_) =>
        // ...
        serviceId.map(applicationName => {
          rules match {
            case array: Array[EntranceGatewayRouterRuler] => array.foreach(_.rule(applicationName, gatewayContext))
            case _ =>
          }
          ServiceInstance(applicationName, gatewayContext.getGatewayRoute.getServiceInstance.getInstance)
        }).orNull
      case _ => null
    }
  }

}

linkis-gateway-ujes-datasource-ruler 模块,主要是做了一个 DataSource 和 Entrance Instance 的映射,并保存在 Mysql 中。DatasourceGatewayRouterRuler 实现了具体的路由策略,DatasourceMapService 接口维护 DataSource 映射。

// 维护 DataSource 映射的接口
public interface DatasourceMapService {

    String getInstanceByDatasource(String datasourceName);

    long countByInstance(String instance);

    String insertDatasourceMap(String datasourceName, String instance, String serviceId);

}

// EntranceGatewayRouterRuler 的实现类,执行具体的路由逻辑
class DatasourceGatewayRouterRuler extends EntranceGatewayRouterRuler with Logging {

  // 路由的方法
  override def rule(serviceId: String, gatewayContext: GatewayContext): Unit = if(StringUtils.isNotBlank(gatewayContext.getRequest.getRequestBody)) {
    // 从请求中获取 datasourceName
    val datasourceName = getDatasourceName(gatewayContext.getRequest.getRequestBody)
    if (StringUtils.isBlank(datasourceName)) return
    debug(s"datasourceName: $datasourceName")
    // 通过 datasourceName 获取映射
    datasourceMapService.getInstanceByDatasource(datasourceName) match {
      case i: String if StringUtils.isNotBlank(i) => 
        // 存在映射直接返回 Instance
        gatewayContext.getGatewayRoute.getServiceInstance.setInstance(i)
      case _ => {
        // 不存在映射时,先获取 Instance 列表,并根据已经存在映射的数据按照从小到大排序,获取最少映射的 Instance,插入 DataSource 映射并返回
        val newInstance = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId)
          .map(item => (item, datasourceMapService.countByInstance(item.getInstance)))
          .sortBy(_._2).map(_._1.getInstance).headOption match {
            case Some(item) => datasourceMapService.insertDatasourceMap(datasourceName, item, serviceId)
            case None => null
          }
        debug(s"newInstance: $newInstance")
        if (StringUtils.isNotBlank(newInstance)) {
          gatewayContext.getGatewayRoute.getServiceInstance.setInstance(newInstance)
        }
      }
    }
  }

}