teeyog / blog

My Blog
76 stars 24 forks source link

Standalone模式下Master、WorKer启动流程 #12

Open teeyog opened 6 years ago

teeyog commented 6 years ago

本文基于spark2.1进行解析

前言

Spark作为分布式的计算框架可支持多种运行模式:

  • 本地运行模式 (单机)
  • 本地伪集群运行模式(单机模拟集群)
  • Standalone Client模式(集群)
  • Standalone Cluster模式(集群)
  • YARN Client模式(集群)
  • YARN Cluster模式(集群)

而Standalone 作为spark自带cluster manager,需要启动Master和Worker守护进程,本文将从源码角度解析两者的启动流程。Master和Worker之间的通信使用的是基于netty的RPC,Spark的Rpc推荐看深入解析Spark中的RPC

Master 启动

启动Master是通过脚本start-master.sh启动的,里面实际调用的类是:

org.apache.spark.deploy.master.Master

看看其main方法:

def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    // 创建RpcEnv,启动Rpc服务
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    //阻塞等待
    rpcEnv.awaitTermination()
  }

main方法先获取配置参数创建SparkConf,通过startRpcEnvAndEndpoint启动一个RPCEnv并创建一个Endpoint,调用awaitTermination来阻塞服务端监听请求并且处理。下面细看startRpcEnvAndEndpoint方法:

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    // 创建RpcEnv
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    //通过rpcEnv 创建一个Endpoint
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }

首先创建了RpcEnv,RpcEnv是整个Spark RPC的核心所在,RPCEndpoint定义了处理消息的逻辑,被创建后就被RpcEnv所管理,整个生命周期顺序为onStart,receive,onStop,其中receive可以被同时调用,ThreadSafeRpcEndpoint中的receive是线程安全的,同一时刻只能被一个线程访问。

该方法中向rpcEnv 注册的Endpoint是Master(继承了ThreadSafeRpcEndpoint),Master的构造器中创建了保存各种信息的变量。

 ...
  //一个HashSet用于保存WorkerInfo
  val workers = new HashSet[WorkerInfo]
 //一个HashSet用于保存客户端(SparkSubmit)提交的任务
  val apps = new HashSet[ApplicationInfo]
 //等待调度的App
  val waitingApps = new ArrayBuffer[ApplicationInfo]
 //保存DriverInfo
  val drivers = new HashSet[DriverInfo]
 ...

由于Master是一个Endpoint并被RpcEnv管理,需要先执行生命周期的onStart方法:

override def onStart(): Unit = {
   ...
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(CheckForWorkerTimeOut)
      }
    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
   ...
  }

向线程池中加入了一个线程,每隔WORKER_TIMEOUT_MS(默认60秒)时间去检测是否有Worker超时,其实就是向自己发送了一个CheckForWorkerTimeOut事件,稍后再细讲。

Worker启动

多个节点上的Worker是通过脚本start-slaves.sh启动,底层调用的类是:

org.apache.spark.deploy.worker.Worker

看看其main方法:

def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }

和Master类似,也是先获取配置参数创建SparkConf,接着调用startRpcEnvAndEndpoint启动一个RPCEnv并创建一个Endpoint,调用awaitTermination来阻塞服务端监听请求并且处理。

 def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
    rpcEnv
  }

这里是通过new了一个Worker实例来作为Endpoint并注册到RpcEnv中,Worker的构造器中初始化了心跳超时时间为Master端的1/4及其他变量

Worker向Master注册

Worker需要根据生命周期执行onStart()方法:

override def onStart() {
   ...
    registerWithMaster()
   ...
  }

在onStart()方法中调用了registerWithMaster来向Master来注册自己:

private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    registrationRetryTimer match {
      case None =>
        // 是否已注册
        registered = false
        // 尝试向所有Master注册自己
        registerMasterFutures = tryRegisterAllMasters()
        // 尝试连接次数
        connectionAttemptCount = 0
        // 网络或者Master故障的时候就需要重新注册自己
        // 注册重试次数超过阈值则直接退出
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }

registrationRetryTimer第一次调用肯定为None,通过tryRegisterAllMasters向Master注册自己,后面还启动了一个线程在有限次数内去尝试重新注册(网络或者Master出现故障是需要重新注册)。这里先看tryRegisterAllMasters方法是如何向Master注册的:

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    masterRpcAddresses.map { masterAddress =>
      registerMasterThreadPool.submit(new Runnable {
        override def run(): Unit = {
          try {
            logInfo("Connecting to master " + masterAddress + "...")
            val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            registerWithMaster(masterEndpoint)
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        }
      })
    }
  }

这里调用了rpcEnv.setupEndpointRef,RpcEndpointRef 是 RpcEnv 中的 RpcEndpoint 的引用,是一个序列化的实体以便于通过网络传送或保存以供之后使用。一个 RpcEndpointRef 有一个地址和名字。可以调用 RpcEndpointRef 的 send 方法发送异步的单向的消息给对应的 RpcEndpoint 。

这里整段代码意思即是:遍历所有masterRpcAddresses,调用registerWithMaster方法,并传入master端的RpcEndpoint引用RpcEndpointRef ,继续看看registerWithMaster方法:

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, workerWebUiUrl))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }

通过RpcEndpointRef 和Master建立通信向Master发送RegisterWorker消息,并带入workerid,host,Port,cores,内存等参数信息,并有成功或者失败的回调函数稍后讲解。

Master 接收Worker注册

在Master中通过receiveAndReply方法处理各种需要回应的事件(单向消息通过receive),对于Worker注册消息RegisterWorker处理逻辑:

case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      // 当前Master处于STANDBY
      if (state == RecoveryState.STANDBY) {
        context.reply(MasterInStandby)
      // Worker已经注册过了
      } else if (idToWorker.contains(id)) {
        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        // 根据Worker注册信息为Worker创建WorkerInfo
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerWebUiUrl)
        if (registerWorker(worker)) {
          // 持久化记录Worker信息
          persistenceEngine.addWorker(worker)
          // 向Worker回复注册成功消息
          context.reply(RegisteredWorker(self, masterWebUiUrl))
          // 有了新的Worker,资源新增,为等待的app进行调度
          schedule()
        } else {
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          // 向Worker回复注册失败消息
          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }
  1. 若当前Master处于STANDBY状态,直接返回MasterInStandby消息
  2. 若Worker已经注册过了,直接返回RegisterWorkerFailed消息
  3. 根据Worker注册信息为Worker创建WorkerInfo,调用registerWorker方法进行注册:
    • 若注册成功则持久化这个Worker信息,并向Worker回复注册成功消息,另外,多了一个Worker意味着资源的增加会通过schedule()去调度等待调度的apps。
    • 若注册失败,则直接向Worker回复注册失败消息。

那是怎么判断是否注册成功呢?跟进registerWorker方法:

private def registerWorker(worker: WorkerInfo): Boolean = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's),
    // remove them.
    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      workers -= w
    }
    // 获取新worker的workerAddress 
    val workerAddress = worker.endpoint.address
    if (addressToWorker.contains(workerAddress)) {
      // 根据workerAddress 获取以前注册的老Worker
      val oldWorker = addressToWorker(workerAddress)
      // 若为UNKNOWN则说明是Master 处于recovery,Worker处于恢复中
      if (oldWorker.state == WorkerState.UNKNOWN) {
        // 移除老Worker,接受新注册的Worker
        removeWorker(oldWorker)
      } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
      }
    }
    // 跟新变量
    workers += worker
    idToWorker(worker.id) = worker
    addressToWorker(workerAddress) = worker
    true
  }

遍历所有管理的Worker,若有与新注册的Worker相同的host,port且处于Dead(超时)状态的Worker则直接从workers中移除。若管理的addressToWorker已经存在新注册的Worker一样的workerAddress,则获取老Worker,若状态是UNKNOWN说明Master 处于recovery,Worker正处于恢复中,则将老Worker移除,将新Worker直接加入并成功返回,若老Worker是其他状态则说明已经重复注册了,返回失败。

Worker接收Master注册反馈消息

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, workerWebUiUrl))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }

在Worker向Master注册的时候就是调用的这个registerWithMaster方法,后随有回调方法处理结果,通过handleRegisterResponse来处理各种类型的反馈消息:

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
    msg match {
      // 成功注册
      case RegisteredWorker(masterRef, masterWebUiUrl) =>
        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        // 标记成功注册
        registered = true
        // 跟新映射,删除其他的registeration retry
        changeMaster(masterRef, masterWebUiUrl)
        // 向Master发送心跳
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
       ...
      // 注册失败,直接退出进程
      case RegisterWorkerFailed(message) =>
        if (!registered) {
          logError("Worker registration failed: " + message)
          System.exit(1)
        }
      // Master不是处于Active的Master,忽略
      case MasterInStandby =>
        // Ignore. Master not yet ready.
    }
  }
  1. 当注册Worker失败收到RegisterWorkerFailed消息,则退出。
  2. 当注册的Master处于Standby状态,直接忽略。
  3. 注册Worker成功返回RegisteredWorker消息时,先标记注册成功,然后通过changeMaster更改一些变量(如activeMasterUrl,master,connected等),并删除当前其他正在重试的注册。然后新建了一个task到线程池执行,该线程每隔HEARTBEAT_MILLIS时间向自己发送一个SendHeartbeat消息,在消息处理方法receive里面可看到消息处理方法,即向Master发送心跳:
    case SendHeartbeat =>
      if (connected) { sendToMaster(Heartbeat(workerId, self)) }

Master 接收心跳

case Heartbeat(workerId, worker) =>
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          if (workers.map(_.id).contains(workerId)) {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " Asking it to re-register.")
            worker.send(ReconnectWorker(masterUrl))
          } else {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " This worker was never registered, so ignoring the heartbeat.")
          }
      }

master端获取对应的workerInfo,若有则跟新上次获取心跳时间lastHeartbeat,若没有则向Worker发送需要重新建立连接的消息。

Master 检测Worker心跳超时

另外,由上文可知在Master的生命周期onStart里专门启动了一个线程检查worker是否超时,看看Master是如何处理的:

case CheckForWorkerTimeOut =>
      timeOutDeadWorkers()

private def timeOutDeadWorkers() {
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    val currentTime = System.currentTimeMillis()
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
    for (worker <- toRemove) {
      if (worker.state != WorkerState.DEAD) {
        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
          worker.id, WORKER_TIMEOUT_MS / 1000))
        removeWorker(worker)
      } else {
        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
        }
      }
    }
  }

遍历所有管理的Worker,若上次心跳时间离现在已经超过超时时间则判断为超时,将从worker列表里移除。