相比hadoop,spark的优势在于迭代计算,尤其是一些机器学习算法的实现;在这类计算中,经常需要同步large read-only数据,比如字典表、通用的配置文件等。如果将这些数据和运算闭包绑定在一次,那每一次迭代都需要考虑这些数据,没有必要;类似于hadoop的distributeCache,spark通过broadcast服务,将数据同步到每一个worker节点,同时这些数据且只同步一次;另外,论文详细分析了spark默认的broadcast策略(基于HDFS)的性能,并与Chained Streaming Broadcast,BitTorrent Broadcast和SplitStreaming Broadcast做了比较,有兴趣的朋友可以去看一下。本节会逐一介绍spark源码中httpbroadcast、treebroadcast、bitTorrent broadcast的实现;
broadcast的调用,是通过sparkcontext实例调用broadcast方法实现,比如HadoopRDD中通过broadcast同步JobConf对象,sc即sparkcontext的一个实例。
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
而sc.broadcast会初始化一个Broadcast的实体类:
/**
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal)
2)httpbroadcast
spark.broadcast.httpbroadcast是通过在master节点建立http-server(包的是jetty server)的方式来广播对象的。首先,主节点初始化的时候,开启服务,而worker节点从环境变量中获取主节点的serveruri;
def initialize(isMaster: Boolean) {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
if (isMaster) {
createServer()
}
serverUri = System.getProperty("spark.httpBroadcast.uri")
initialized = true
}
}
}
sc.broadcast方法广播变量的时候(会在主节点执行),会首先生成一个broadcast_id,然后将<id,value>交由blockMangager管理(注:blockManager并不会把这个blockid的状态report给master节点,这里面blockManager只是起到了cache的作用,确保broadcast广播的变量只同步一次);由于value声明为transient,所以在broadcast对象序列化的时候会忽略value变量,这样避免大的数据对象在集群中的拷贝;httpbroadcast在write的时候,会将value写入主节点的临时目录下的以broadcast-id命名的文件内。
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
def value = value_
def blockId: String = "broadcast_" + id
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
}
if (!isLocal) {
HttpBroadcast.write(id, value_)
}
def write(id: Long, value: Any) {
val file = new File(broadcastDir, "broadcast-" + id)
val out: OutputStream = if (compress) {
new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
}
worker节点在反序列化broadcast对象实例的时候,会开始从master节点同步broadcast_id对应的内容;具体实现是,每一个broadcast的派生类都overwrite了一个readObject的方法,该方法会定义从master节点同步数据的策略。在httpbroadcast中,worker节点反序列化broadcast对象获得blockid,首先请求blockmanager,检测blockmanager中是否已经存有该blockid,如果有的的话,则通过blockManager将value数据同步到worker节点,反之,则通过socket将blockid文件的内容同步到work节点,然后将<id,value>存入blockmanager。
// Called by JVM when deserializing an object
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
case Some(x) => value_ = x.asInstanceOf[T]
case None => {
logInfo("Started reading broadcast variable " + id)
val start = System.nanoTime
value_ = HttpBroadcast.read[T](id)
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")
}
}
}
}
可见,http-broadcast是采用中心节点同步的方式,随着集群规模的扩大,这样的同步方式显然效率不高;
3) TreeBroadcast
broadcast factory初始化treebroadcast时,在master节点,会初始化TrackMultipleValues服务;TrackMultipleValues服务监听MasterTrackerPort端口,处理接受到得三类数据:1)register-broadcast:将发送节点的hostaddress和监听端口信息加入valueToGuideMap;2)unregister-broadcast:将发送节点信息在valueToGuideMap里面置空;3)find-broadcast:根据发送节点的请求的id,返回对应的SourceInfo;
def initialize(isMaster__ : Boolean) {
synchronized {
........
if (isMaster) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
// Set masterHostAddress to the master's IP address for the slaves to read
System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress)
}
initialized = true
}
}
}
if (messageType == REGISTER_BROADCAST_TRACKER) {
// Receive Long
val id = ois.readObject.asInstanceOf[Long]
// Receive hostAddress and listenPort
val gInfo = ois.readObject.asInstanceOf[SourceInfo]
/ Add to the map
valueToGuideMap.synchronized {
valueToGuideMap += (id -> gInfo)
}
// Send dummy ACK
oos.writeObject(-1)
oos.flush()
} else if (messageType == UNREGISTER_BROADCAST_TRACKER) {
// Receive Long
val id = ois.readObject.asInstanceOf[Long]
// Remove from the map
valueToGuideMap.synchronized {
valueToGuideMap(id) = SourceInfo("", SourceInfo.TxOverGoToDefault)
}
// Send dummy ACK
oos.writeObject(-1)
oos.flush()
} else if (messageType == FIND_BROADCAST_TRACKER) {
// Receive Long
val id = ois.readObject.asInstanceOf[Long]
var gInfo =
if (valueToGuideMap.contains(id)) valueToGuideMap(id)
else SourceInfo("", SourceInfo.TxNotStartedRetry)
// Send reply back
oos.writeObject(gInfo)
oos.flush()
}
在初始化完成之后,与httpbroadcast类似,treebroadcast的sparkcontext的实例会通过broadcast方法,开始广播变量,期间会在master节点开启如下两个服务:
1. GuideMultipleRequests服务,保存集群中每个work节点的存有的broadcast变量信息,包括变量block个数,大小等;work节点在receivebroadcast时,会首先请求该服务,根据broadcast-id,获得相应的SourceInfo(master节点有相应的策略选择拥有该SourceInfo节点),其中包括该SourceInfo所在的hostaddress以及listenport;master节点会将该work节点的SourceInfo加入listOfSources;
2. ServeMultipleRequests服务,会在集群中的每一个节点启动,用以broadcast变量的block块的同步;work节点在获得SourceInfo之后,会向对应的节点下载block数据;由于节点既分发数据又下载数据,需要对正在同步的block加锁来确保数据的一致性;
private def sendObject() {
// Wait till receiving the SourceInfo from Master
while (totalBlocks == -1) {
totalBlocksLock.synchronized { totalBlocksLock.wait() }
}
for (i <- sendFrom until sendUntil) {
while (i == hasBlocks) {
hasBlocksLock.synchronized { hasBlocksLock.wait() }
}
}
..........
}
receiveSingleTransmission
for (i <- hasBlocks until totalBlocks) {
val recvStartTime = System.currentTimeMillis
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
val receptionTime = (System.currentTimeMillis - recvStartTime)
logDebug("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.")
arrayOfBlocks(hasBlocks) = bcBlock
hasBlocks += 1
// Set to true if at least one block is received
receptionSucceeded = true
hasBlocksLock.synchronized { hasBlocksLock.notifyAll() }
}
最后简单介绍一下,master节点selectSuitableSource的策略:每一个work节点connect主节点的时候,都会被加入listOfSources,master节点按照FIFO的优先策略来选择source供work节点下载broadcast变量;首先只有master节点具有全部的数据,master节点选为源节点,每一个work节点从源节点同步数据,每连接上一个work节点,当前的源节点的leechers数加一,当源节点的leechers大于MaxDegree的时候,则停止该源节点,开始选择后来加入的work节点作为源节点,以此类推;当某个废弃的源节点完全同步成功所有的block后,该节点的leechers数减一,同时append入listOfSources,显然该节点的优先级别更高;
private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = {
var maxLeechers = -1
var selectedSource: SourceInfo = null
listOfSources.foreach { source =>
if ((source.hostAddress != skipSourceInfo.hostAddress ||
source.listenPort != skipSourceInfo.listenPort) &&
source.currentLeechers < MultiTracker.MaxDegree &&
source.currentLeechers > maxLeechers) {
selectedSource = source
maxLeechers = source.currentLeechers
}
}
// Update leecher count
selectedSource.currentLeechers += 1
return selectedSource
}