private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
前言
Spark Streaming Job的生成是通过
JobGenerator
每隔 batchDuration 长时间动态生成的,每个batch 对应提交一个JobSet,因为针对一个batch可能有多个输出操作。概述流程:
入口
在 JobGenerator 初始化的时候就创建了一个定时器:
每隔 batchDuration 就会向 eventLoop 发送 GenerateJobs(new Time(longTime))消息,eventLoop的事件处理方法中会调用generateJobs(time)方法:
为当前batchTime分配Block
首先调用
receiverTracker.allocateBlocksToBatch(time)
方法为当前batchTime分配对应的Block,最终会调用receiverTracker
的Block管理者receivedBlockTracker
的allocateBlocksToBatch
方法:可以看到是从
streamIdToUnallocatedBlockQueues
中获取到所有streamId对应的未分配的blocks,该队列的信息是supervisor 存储好Block后向receiverTracker上报的Block信息,详情可见 ReceiverTracker 数据产生与存储。获取到所有streamId对应的未分配的blockInfos后,将其放入了
timeToAllocatedBlocks:Map[Time, AllocatedBlocks]
中,后面生成RDD的时候会用到。为当前batchTime生成Jobs
调用
DStreamGraph
的generateJobs
方法为当前batchTime生成job:一个outputStream就对应一个job,遍历所有的outputStreams,为其生成job:
先获取到time对应的RDD,然后将其作为参数再调用foreachFunc方法,foreachFunc方法是通过构造器传过来的,我们来看看print()输出的情况:
这里的构造的foreachFunc方法就是最终和rdd一起提交job的执行方法,也即对rdd调用take()后并打印,真正触发action操作的是在这个func函数里,现在再来看看是怎么拿到rdd的,每个DStream都有一个
generatedRDDs:Map[Time, RDD[T]]
变量,来保存time对应的RDD,若获取不到则会通过compute()方法来计算,对于需要在executor上启动Receiver来接收数据的ReceiverInputDStream来说:会通过receiverTracker来获取该batch对应的blocks,前面已经分析过为所有streamId分配了对应的未分配的block,并且放在了
timeToAllocatedBlocks:Map[Time, AllocatedBlocks]
中,这里底层就是从这个timeToAllocatedBlocks
获取到的blocksInfo,然后调用了createBlockRDD(validTime, blockInfos)
通过blockId创建了RDD。最后,将通过此RDD和foreachFun构建jobFunc,并创建Job返回。
封装jobs成JobSet并提交执行
每个outputStream对应一个Job,最终就会生成一个jobs,为这个jobs创建JobSet,并通过
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
来提交这个JobSet:然后通过jobExecutor来执行,jobExecutor是一个线程池,并行度默认为1,可通过
spark.streaming.concurrentJobs
配置,即同时可执行几个批次的数据。处理类JobHandler中调用的是Job.run(),执行的是前面构建的 jobFunc 方法。