// get&put 远程block的时候就是通过blockTransferService 完成的
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
private[spark] val memoryStore =
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
memoryManager.setMemoryStore(memoryStore)
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, filename)
}
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val fileOutputStream = new FileOutputStream(file)
var threwException: Boolean = true
try {
writeFunc(fileOutputStream)
threwException = false
} finally {
try {
Closeables.close(fileOutputStream, threwException)
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}
概述
BlockManager是spark自己的存储系统,RDD-Cache、 Shuffle-output、broadcast 等的实现都是基于BlockManager来实现的,BlockManager也是分布式结构,在driver和所有executor上都会有blockmanager节点,每个节点上存储的block信息都会汇报给driver端的blockManagerMaster作统一管理,BlockManager对外提供get和set数据接口,可将数据存储在memory, disk, off-heap。
blockManager的创建与注册
blockManagerMaster和blockManager都是在构造SparkEnv的时候创建的,Driver端是创建SparkContext的时候创建SparkEnv,Executor端的SparkEnv是在其守护进程CoarseGrainedExecutorBackend创建的时候创建的,下面看blockManager是怎么在sparkEnv中创建的:
构造blockManagerMaster的时候在Driver端是创建了一个BlockManagerMasterEndpoint并注册到了rpcEnv中,而在executor端是获取到了 Driver端BlockManagerMasterEndpoint的引用 BlockManagerMasterRef,以便后面的通信。随后都创建了自己blockManager,创建blockManager的时候都创建了BlockManagerSlaveEndpoint。
blockManager创建后还不能直接使用,接着都会调用blockManager的initialize方法,通过与master通信向master进行注册,master收到消息后会将blockManager的信息存到blockManagerInfo的map中,key为blockManagerId(保存着executorId、host、post等信息),value为BlockManagerInfo(保存着具体的block状态信息及 BlockManagerSlaveEndpoint 的引用 ),注册完后就可以真正干活了。
master与slave间的消息传递
slave -> master
master -> slave
存储
在blockManager被创建的时候创建了MemoryStore和DiskStore两个对象用以存取block。
DiskStore
diskSore就是基于磁盘来存储数据的,diskStore有一个成员DiskBlockManager,其主要作用就是逻辑block和磁盘block的映射,block的blockId对应磁盘文件中的一个文件。
通过blockId的hash值和localDirs的个数求余来决定在哪个localDir下创建文件,这里的localDirs是可配置的多个目录,可通过SPARK_LOCAL_DIRS进行设置,多个目录以逗号分割,配置多个目录的目的是可分散磁盘的读写压力。另外spark在每个localDir中创建了64(可通过spark.diskStore.subDirectories配置)个子目录来分散文件,子文件的选择也是通过blockId的hash值来计算的。
在diskStore中的putButes方法就是真正写数据到磁盘的方法:
接收一个blockId和要写的字节数据,通过blockId获取到要写的具体文件并得到对应的文件输出流,将该bytes直接write这个流里,完成写文件。
diskStore还有一个重要的方法getBytes方法,即读磁盘文件的方法,通过blockId获取到对应的磁盘文件,以字节 buffer 的形式返回。
此外还有查询blockId对应文件的大小、是否存在blockId对应的文件、删除blockId对应的文件等方法。
MemoryStore
memorySore是基于JVM的堆内存来存储数据,MemoryStore内部维护了一个hash map来管理所有的block,以block id为key将block存放到hash map中。
基于内存存储不像存磁盘那么简单,毕竟内存非常有限,memorySore有个专门管理内存的成员memoryManager,spark的内存管理详情参考内存管理 MemoryManager 解析。
放内存就意味着要有足够的内存来放,不然会导致OOM。
若blockId 对应的数据以bytes数据的方式存放,则可根据其size大小来申请内存存放,若不能申请足够的内存则说明放入内存失败。对应的方法是:
若blockId 对应的数据通过迭代器的方式写入内存,则无法提前知道其数据大小,这里的做法是逐步展开迭代器来检查是否还有空余内存。如果迭代器顺利展开了,那么用来展开迭代器的内存直接转换为存储内存,而不用再去分配内存来存储该 block 数据。如果未能完全开展迭代器,则返回一个包含 block 数据的迭代器,其对应的数据是由已经展开的数据和未展开的迭代器组成,在未能成功展开的情况下说明存储内存失败,将会根据存储级别判断是否需要存到磁盘,对应的方法是:
通过memoryStore读数据也有两种方式,一个是以字节buffer的形式返回指定的block数据,另一个是以迭代器的形式返回指定的block数据。
blockManager对外服务
blockManager典型的几个应用场景如下:
参考