本文主要描述Spark的standalone模式启动时候,master和work的状态transfer,并简要分析相关的代码;先上一幅状态图
1. Master启动
"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT
--webui-port $SPARK_MASTER_WEBUI_PORT 由上述启动脚本可见,Master的启动主函数位于spark.deploy.master.Master:
private[spark] object Master {
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
val actor = actorSystem.actorOf(
Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")
actorSystem.awaitTermination()
}
} 首先:
1. 解析参数,获得master-ip,master-port等必要参数;
2. 创建MasterActor,每一个Actor实例初始化的时候会执行preStart方法,Master对象的preStart方法会开启webui,并且为了监听connect到master的work节点的状态,MasterActor订阅监听RemoteClientLifeCycleEvent,监听所有的outbound-related events;当worker节点出现disconnected或者shutdown的时候,receive中接受到这样的event,会remove该work节点,以及在该work节点上提交的Job;
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi()
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or a job; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or a job; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
2. Worker的启动
"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1 Work启动的主函数位于spark.deploy.worker.Worker,其会初始化WorkActor,在connect to master的方法中执行如下操作:
1. 判断masterURL是否合法;
2. 生成MasterActor的ActorRef,并向MasterActor提交RegisterWorker的消息,MasterActor接到消息之后会将该Worker节点加入自己维护的一个works的hashset;
def connectToMaster() {
masterUrl match {
case MASTER_REGEX(masterHost, masterPort) => {
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
}
case _ =>
logError("Invalid master URL: " + masterUrl)
System.exit(1)
}
} 3. 在提交注册消息以后,与MasterActor一样,WorkerActor也订阅监听RemoteClientLifeCycleEvent,并在receive方法里面,对相应的event调用masterDisconnected,kill掉该节点的executors,并退出:
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
补充,AKKA的Remote System Events
在AKKA Concurrency一书中,对AKKA的remote node做了描述,即在AKKA中,节点与节点之间是peer的概念,对于彼此来说,它们既是clients又是servers,如图所示:
当以plane node为主的话,outbound的都是client event,inbound的都是server event;对于Airport node亦然; |