Spark的standalone源码分析(四)

论坛 期权论坛 脚本     
已经匿名di用户   2022-5-29 19:20   1175   0

承接上文,继续分析sparkcontext初始化中开启的broadcast服务,文中部分内容参考论文“Performance and Scalability of Broadcast in Spark ”;

2.2 BroadcastManager

相比hadoop,spark的优势在于迭代计算,尤其是一些机器学习算法的实现;在这类计算中,经常需要同步large read-only数据,比如字典表、通用的配置文件等。如果将这些数据和运算闭包绑定在一次,那每一次迭代都需要考虑这些数据,没有必要;类似于hadoop的distributeCache,spark通过broadcast服务,将数据同步到每一个worker节点,同时这些数据且只同步一次;另外,论文详细分析了spark默认的broadcast策略(基于HDFS)的性能,并与Chained Streaming Broadcast,BitTorrent Broadcast和SplitStreaming Broadcast做了比较,有兴趣的朋友可以去看一下。本节会逐一介绍spark源码中httpbroadcast、treebroadcast、bitTorrent broadcast的实现;

首先说一下broadcast的初始化和调用方法;

1) broadcastManager的初始化及调用

broadcastmanager初始化的时候,带有isMaster参数,即master节点和work节点的初始化操作是有区别的;
class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {

  private var initialized = false
  private var broadcastFactory: BroadcastFactory = null

broadcast的调用,是通过sparkcontext实例调用broadcast方法实现,比如HadoopRDD中通过broadcast同步JobConf对象,sc即sparkcontext的一个实例。
 val confBroadcast = sc.broadcast(new SerializableWritable(conf))
而sc.broadcast会初始化一个Broadcast的实体类:
 /**
   * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
   * reading it in distributed functions. The variable will be sent to each cluster only once.
   */
  def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal)

2)httpbroadcast

spark.broadcast.httpbroadcast是通过在master节点建立http-server(包的是jetty server)的方式来广播对象的。首先,主节点初始化的时候,开启服务,而worker节点从环境变量中获取主节点的serveruri;
  def initialize(isMaster: Boolean) {
    synchronized {
      if (!initialized) {
        bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
        compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
        if (isMaster) {
          createServer()
        }
        serverUri = System.getProperty("spark.httpBroadcast.uri")
        initialized = true
      }
    }
  }

sc.broadcast方法广播变量的时候(会在主节点执行),会首先生成一个broadcast_id,然后将<id,value>交由blockMangager管理(注:blockManager并不会把这个blockid的状态report给master节点,这里面blockManager只是起到了cache的作用,确保broadcast广播的变量只同步一次);由于value声明为transient,所以在broadcast对象序列化的时候会忽略value变量,这样避免大的数据对象在集群中的拷贝;httpbroadcast在write的时候,会将value写入主节点的临时目录下的以broadcast-id命名的文件内。
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
  
  def value = value_

  def blockId: String = "broadcast_" + id

  HttpBroadcast.synchronized {
    SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
  }

  if (!isLocal) { 
    HttpBroadcast.write(id, value_)
  }

  def write(id: Long, value: Any) {
    val file = new File(broadcastDir, "broadcast-" + id)
    val out: OutputStream = if (compress) {
      new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
    } else {
      new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
    }
    val ser = SparkEnv.get.serializer.newInstance()
    val serOut = ser.serializeStream(out)
    serOut.writeObject(value)
    serOut.close()
  }
worker节点在反序列化broadcast对象实例的时候,会开始从master节点同步broadcast_id对应的内容;具体实现是,每一个broadcast的派生类都overwrite了一个readObject的方法,该方法会定义从master节点同步数据的策略。在httpbroadcast中,worker节点反序列化broadcast对象获得blockid,首先请求blockmanager,检测blockmanager中是否已经存有该blockid,如果有的的话,则通过blockManager将value数据同步到worker节点,反之,则通过socket将blockid文件的内容同步到work节点,然后将<id,value>存入blockmanager。
  // Called by JVM when deserializing an object
  private def readObject(in: ObjectInputStream) {
    in.defaultReadObject()
    HttpBroadcast.synchronized {
      SparkEnv.get.blockManager.getSingle(blockId) match {
        case Some(x) => value_ = x.asInstanceOf[T]
        case None => {
          logInfo("Started reading broadcast variable " + id)
          val start = System.nanoTime
          value_ = HttpBroadcast.read[T](id)
          SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
          val time = (System.nanoTime - start) / 1e9
          logInfo("Reading broadcast variable " + id + " took " + time + " s")
        }
      }
    }
  }
可见,http-broadcast是采用中心节点同步的方式,随着集群规模的扩大,这样的同步方式显然效率不高;

3) TreeBroadcast

broadcast factory初始化treebroadcast时,在master节点,会初始化TrackMultipleValues服务;TrackMultipleValues服务监听MasterTrackerPort端口,处理接受到得三类数据:1)register-broadcast:将发送节点的hostaddress和监听端口信息加入valueToGuideMap;2)unregister-broadcast:将发送节点信息在valueToGuideMap里面置空;3)find-broadcast:根据发送节点的请求的id,返回对应的SourceInfo;
  def initialize(isMaster__ : Boolean) {
    synchronized {
 ........
        if (isMaster) {
          trackMV = new TrackMultipleValues
          trackMV.setDaemon(true)
          trackMV.start()    
          // Set masterHostAddress to the master's IP address for the slaves to read
          System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress)
        }
        initialized = true
      }
    }
  }

if (messageType == REGISTER_BROADCAST_TRACKER) {
        // Receive Long
         val id = ois.readObject.asInstanceOf[Long]
         // Receive hostAddress and listenPort
         val gInfo = ois.readObject.asInstanceOf[SourceInfo]

         / Add to the map
         valueToGuideMap.synchronized {
               valueToGuideMap += (id -> gInfo)
         }

          // Send dummy ACK
          oos.writeObject(-1)
          oos.flush()
} else if (messageType == UNREGISTER_BROADCAST_TRACKER) {
         // Receive Long
         val id = ois.readObject.asInstanceOf[Long]

         // Remove from the map
         valueToGuideMap.synchronized {
                 valueToGuideMap(id) = SourceInfo("", SourceInfo.TxOverGoToDefault)
         }

         // Send dummy ACK
         oos.writeObject(-1)
         oos.flush()
} else if (messageType == FIND_BROADCAST_TRACKER) {
         // Receive Long
         val id = ois.readObject.asInstanceOf[Long]

         var gInfo =
             if (valueToGuideMap.contains(id)) valueToGuideMap(id)
              else SourceInfo("", SourceInfo.TxNotStartedRetry)

         // Send reply back
         oos.writeObject(gInfo)
         oos.flush()
}

在初始化完成之后,与httpbroadcast类似,treebroadcast的sparkcontext的实例会通过broadcast方法,开始广播变量,期间会在master节点开启如下两个服务:
1. GuideMultipleRequests服务,保存集群中每个work节点的存有的broadcast变量信息,包括变量block个数,大小等;work节点在receivebroadcast时,会首先请求该服务,根据broadcast-id,获得相应的SourceInfo(master节点有相应的策略选择拥有该SourceInfo节点),其中包括该SourceInfo所在的hostaddress以及listenport;master节点会将该work节点的SourceInfo加入listOfSources;
2. ServeMultipleRequests服务,会在集群中的每一个节点启动,用以broadcast变量的block块的同步;work节点在获得SourceInfo之后,会向对应的节点下载block数据;由于节点既分发数据又下载数据,需要对正在同步的block加锁来确保数据的一致性;
      private def sendObject() {
        // Wait till receiving the SourceInfo from Master
        while (totalBlocks == -1) {
          totalBlocksLock.synchronized { totalBlocksLock.wait() }
        }

        for (i <- sendFrom until sendUntil) {
          while (i == hasBlocks) {
            hasBlocksLock.synchronized { hasBlocksLock.wait() }
          }
 }
 ..........
      }
receiveSingleTransmission
      for (i <- hasBlocks until totalBlocks) {
        val recvStartTime = System.currentTimeMillis
        val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
        val receptionTime = (System.currentTimeMillis - recvStartTime)

        logDebug("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.")

        arrayOfBlocks(hasBlocks) = bcBlock
        hasBlocks += 1
        
        // Set to true if at least one block is received
        receptionSucceeded = true
        hasBlocksLock.synchronized { hasBlocksLock.notifyAll() }
      }

最后简单介绍一下,master节点selectSuitableSource的策略:每一个work节点connect主节点的时候,都会被加入listOfSources,master节点按照FIFO的优先策略来选择source供work节点下载broadcast变量;首先只有master节点具有全部的数据,master节点选为源节点,每一个work节点从源节点同步数据,每连接上一个work节点,当前的源节点的leechers数加一,当源节点的leechers大于MaxDegree的时候,则停止该源节点,开始选择后来加入的work节点作为源节点,以此类推;当某个废弃的源节点完全同步成功所有的block后,该节点的leechers数减一,同时append入listOfSources,显然该节点的优先级别更高;
      private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = {
        var maxLeechers = -1
        var selectedSource: SourceInfo = null

        listOfSources.foreach { source =>
          if ((source.hostAddress != skipSourceInfo.hostAddress || 
               source.listenPort != skipSourceInfo.listenPort) && 
            source.currentLeechers < MultiTracker.MaxDegree &&
            source.currentLeechers > maxLeechers) {
              selectedSource = source
              maxLeechers = source.currentLeechers
            }
        }

        // Update leecher count
        selectedSource.currentLeechers += 1
        return selectedSource
      }



分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:81
帖子:4969
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP