Spark的standalone源码分析(三)

论坛 期权论坛 脚本     
已经匿名di用户   2022-5-29 19:20   1402   0

本文描述SparkContext实例初始化的过程中,spark后台启动的一系列的服务,以及它们之间的交互。

1. SparkContext类

* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

从s park源码中的comments中可以看到,sparkContext类是spark运行的主要集成类,它负责与spark集群的connection,并且负责数据的生成和计算以及其中的task的调度;在Hadoop系统中,集群的调度有两大类:Job调度和Task调度。Spark也类似,如上节所述,Job的调度有master节点的schedule()负责,Task的调度则有sparkcontext实例初始化时开启的ClusterScheduler负责(实际上,可以视为clusterSchedule为一个插件箱,里面可以配置各种各样的backend服务);下面分别介绍一个sparkcontext初始化中的服务;

2. SparkContext初始化

下述代码在core/src/main/scala/spark/sparkcontext.scala文件;
初始化实例的时候,必须要指定master节点的URL,
def this(master: String, jobName: String) = this(master, jobName, null, Nil, Map())

  // Create the Spark execution environment (cache, map output tracker, etc)
  private[spark] val env = SparkEnv.createFromSystemProperties(
    System.getProperty("spark.master.host"),
    System.getProperty("spark.master.port").toInt,
    true,
    isLocal)
  SparkEnv.set(env)

SparkEnv.createFromSystemProperties会依次在master节点,开启如下服务,在scala中其实就是actor: BlockManager,BroadcastManager,CacheTracker(to be removed instead of blockmanager),MapOutputTracker,ShuffleFetcher,HttpFileServer; 下面对这些服务做简要的介绍。

2.1 BlockManager

在hadoop中,数据块由HDFS服务管理,数据块的存放位置、大小信息存放在Master节点,而源数据分布在集群中的slaver节点。与Hadoop类似,spark的BlockManager也负责管理集群中的block,即数据块的信息。master节点和work节点都会生成BlockManager实例,通过masterActor进行block信息的通信;
首先初始化的时候,各个节点的BlockManager都会初始化一个BlockManagerMaster实例:在master节点,会创建BlockManagerMasterActor,如果是在work节点,则masterActor将初始化为BlockManagerMasterActor的actor引用;
  var masterActor: ActorRef = {
    if (isMaster) {
      val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
        name = MASTER_AKKA_ACTOR_NAME)
      logInfo("Registered BlockManagerMaster Actor")
      masterActor
    } else {
      val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
      logInfo("Connecting to BlockManagerMaster: " + url)
      actorSystem.actorFor(url)
    }
  }
每个节点的BlockManager实例化时,首先向master节点masterActor注册blockManagerID,每一个节点的BlockManager信息,会存于master节点的blockManagerInfo;其次,每个节点会初始化一个BlockManagerWorker实例,它定义了集群中master节点和worker节点之间数据传输的网络接口(network interface),其中由接口可以看到集群中的block数据的操作主要有两种操作:put和get;BlockManagerWorker网络接口processBlockMessage(blockMessage: BlockMessage)会根据message的类型做不同的处理;
  private def initialize() {
    logInfo("Initializing BlockManager.....")
    master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
    BlockManagerWorker.startBlockManagerWorker(this)
    if (!BlockManager.getDisableHeartBeatsForTesting) {
      heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
        heartBeat()
      }
    }
  }
  def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
    blockMessage.getType match {
      case BlockMessage.TYPE_PUT_BLOCK => {
        val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
        logDebug("Received [" + pB + "]")
        putBlock(pB.id, pB.data, pB.level)
        return None
      } 
      case BlockMessage.TYPE_GET_BLOCK => {
        val gB = new GetBlock(blockMessage.getId)
        logDebug("Received [" + gB + "]")
        val buffer = getBlock(gB.id)
        if (buffer == null) {
          return None
        }
        return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
      }
      case _ => return None
    }
  }
另外,block的同步会有数据的传输,BlockManger会开启一个ConnectionManager后台服务,该服务采用java nio处理节点之间socket通信;BlockManagerWorker会将消息的处理接口注册为ConnectionManager的回调函数,从而处理相关的消息;
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
简言之,BlockManager负责集群中的数据同步和提取。每个节点都会创建一个BlockManager实例,worker节点和master节点之间通过masterActor通信;节点与节点之间的数据传输通过ConnectionManager的后台服务。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:81
帖子:4969
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP