Spark的standalone源码分析(二)

论坛 期权论坛 脚本     
已经匿名di用户   2022-5-29 19:20   1372   0

本文主要描述Spark的standalone模式启动时候,master和work的状态transfer,并简要分析相关的代码;先上一幅状态图


1. Master启动

"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT
 --webui-port $SPARK_MASTER_WEBUI_PORT
由上述启动脚本可见,Master的启动主函数位于spark.deploy.master.Master:

private[spark] object Master {
  def main(argStrings: Array[String]) {
    val args = new MasterArguments(argStrings)
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
    val actor = actorSystem.actorOf(
      Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")
    actorSystem.awaitTermination()
  }
}
首先:

1. 解析参数,获得master-ip,master-port等必要参数;

2. 创建MasterActor,每一个Actor实例初始化的时候会执行preStart方法,Master对象的preStart方法会开启webui,并且为了监听connect到master的work节点的状态,MasterActor订阅监听RemoteClientLifeCycleEvent,监听所有的outbound-related events;当worker节点出现disconnected或者shutdown的时候,receive中接受到这样的event,会remove该work节点,以及在该work节点上提交的Job;

  override def preStart() {
    logInfo("Starting Spark master at spark://" + ip + ":" + port)
    // Listen for remote client disconnection events, since they don't go through Akka's watch()
    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
    startWebUi()
  }
    case RemoteClientDisconnected(transport, address) => {
      // The disconnected client could've been either a worker or a job; remove whichever it was
      addressToWorker.get(address).foreach(removeWorker)
      addressToJob.get(address).foreach(removeJob)
    }

    case RemoteClientShutdown(transport, address) => {
      // The disconnected client could've been either a worker or a job; remove whichever it was
      addressToWorker.get(address).foreach(removeWorker)
      addressToJob.get(address).foreach(removeJob)
    }

2. Worker的启动

"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
Work启动的主函数位于spark.deploy.worker.Worker,其会初始化WorkActor,在connect to master的方法中执行如下操作:

1. 判断masterURL是否合法;

2. 生成MasterActor的ActorRef,并向MasterActor提交RegisterWorker的消息,MasterActor接到消息之后会将该Worker节点加入自己维护的一个works的hashset;

 def connectToMaster() {
    masterUrl match {
      case MASTER_REGEX(masterHost, masterPort) => {
        logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
        val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
        try {
          master = context.actorFor(akkaUrl)
          master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
          context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
          context.watch(master) // Doesn't work with remote actors, but useful for testing
        } catch {
          case e: Exception =>
            logError("Failed to connect to master", e)
            System.exit(1)
        }
      }

      case _ =>
        logError("Invalid master URL: " + masterUrl)
        System.exit(1)
    }
  }
3. 在提交注册消息以后,与MasterActor一样,WorkerActor也订阅监听RemoteClientLifeCycleEvent,并在receive方法里面,对相应的event调用masterDisconnected,kill掉该节点的executors,并退出:

    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
      masterDisconnected()


补充,AKKA的Remote System Events

在AKKA Concurrency一书中,对AKKA的remote node做了描述,即在AKKA中,节点与节点之间是peer的概念,对于彼此来说,它们既是clients又是servers,如图所示:


当以plane node为主的话,outbound的都是client event,inbound的都是server event;对于Airport node亦然;

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:81
帖子:4969
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP