本文描述SparkContext实例初始化的过程中,spark后台启动的一系列的服务,以及它们之间的交互。
1. SparkContext类
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
从s
park源码中的comments中可以看到,sparkContext类是spark运行的主要集成类,它负责与spark集群的connection,并且负责数据的生成和计算以及其中的task的调度;在Hadoop系统中,集群的调度有两大类:Job调度和Task调度。Spark也类似,如上节所述,Job的调度有master节点的schedule()负责,Task的调度则有sparkcontext实例初始化时开启的ClusterScheduler负责(实际上,可以视为clusterSchedule为一个插件箱,里面可以配置各种各样的backend服务);下面分别介绍一个sparkcontext初始化中的服务;
2. SparkContext初始化
下述代码在core/src/main/scala/spark/sparkcontext.scala文件;
初始化实例的时候,必须要指定master节点的URL,
def this(master: String, jobName: String) = this(master, jobName, null, Nil, Map())
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
System.getProperty("spark.master.host"),
System.getProperty("spark.master.port").toInt,
true,
isLocal)
SparkEnv.set(env)
SparkEnv.createFromSystemProperties会依次在master节点,开启如下服务,在scala中其实就是actor:
BlockManager,BroadcastManager,CacheTracker(to be removed instead of blockmanager),MapOutputTracker,ShuffleFetcher,HttpFileServer;
下面对这些服务做简要的介绍。
2.1 BlockManager
在hadoop中,数据块由HDFS服务管理,数据块的存放位置、大小信息存放在Master节点,而源数据分布在集群中的slaver节点。与Hadoop类似,spark的BlockManager也负责管理集群中的block,即数据块的信息。master节点和work节点都会生成BlockManager实例,通过masterActor进行block信息的通信;
首先初始化的时候,各个节点的BlockManager都会初始化一个BlockManagerMaster实例:在master节点,会创建BlockManagerMasterActor,如果是在work节点,则masterActor将初始化为BlockManagerMasterActor的actor引用;
var masterActor: ActorRef = {
if (isMaster) {
val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
name = MASTER_AKKA_ACTOR_NAME)
logInfo("Registered BlockManagerMaster Actor")
masterActor
} else {
val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
logInfo("Connecting to BlockManagerMaster: " + url)
actorSystem.actorFor(url)
}
}
每个节点的BlockManager实例化时,首先向master节点masterActor注册blockManagerID,每一个节点的BlockManager信息,会存于master节点的blockManagerInfo;其次,每个节点会初始化一个BlockManagerWorker实例,它定义了集群中master节点和worker节点之间数据传输的网络接口(network interface),其中由接口可以看到集群中的block数据的操作主要有两种操作:put和get;BlockManagerWorker网络接口processBlockMessage(blockMessage: BlockMessage)会根据message的类型做不同的处理;
private def initialize() {
logInfo("Initializing BlockManager.....")
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
heartBeat()
}
}
}
def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
blockMessage.getType match {
case BlockMessage.TYPE_PUT_BLOCK => {
val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
logDebug("Received [" + pB + "]")
putBlock(pB.id, pB.data, pB.level)
return None
}
case BlockMessage.TYPE_GET_BLOCK => {
val gB = new GetBlock(blockMessage.getId)
logDebug("Received [" + gB + "]")
val buffer = getBlock(gB.id)
if (buffer == null) {
return None
}
return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
}
case _ => return None
}
}
另外,block的同步会有数据的传输,BlockManger会开启一个ConnectionManager后台服务,该服务采用java nio处理节点之间socket通信;BlockManagerWorker会将消息的处理接口注册为ConnectionManager的回调函数,从而处理相关的消息;
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
简言之,BlockManager负责集群中的数据同步和提取。每个节点都会创建一个BlockManager实例,worker节点和master节点之间通过masterActor通信;节点与节点之间的数据传输通过ConnectionManager的后台服务。
|