设为首页 收藏本站
查看: 1539|回复: 0

[经验分享] (版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

[复制链接]

尚未签到

发表于 2019-1-31 06:36:37 | 显示全部楼层 |阅读模式
  本期内容:
1. ReceiverBlockTracker容错安全性
2. DStream和JobGenerator容错安全性

  

  一:容错安全性
1. ReceivedBlockTracker负责管理Spark Streaming运行程序的元数据。数据层面
2. DStream和JobGenerator是作业调度的核心层面,也就是具体调度到什么程度了,从运行的考虑的。DStream是逻辑层面。
3. 作业生存层面,JobGenerator是Job调度层面,具体调度到什么程度了。从运行的角度的。

  谈Driver容错你要考虑Driver中有那些需要维持状态的运行。
1. ReceivedBlockTracker跟踪了数据,因此需要容错。通过WAL方式容错。
2. DStreamGraph表达了依赖关系,恢复状态的时候需要根据DStream恢复计算逻辑级别的依赖关系。通过checkpoint方式容错。
3. JobGenerator表面你是怎么基于ReceiverBlockTracker中的数据,以及DStream构成的依赖关系不断的产生Job的过程。你消费了那些数据,进行到什么程度了。

  
  总结如下:
  
  
  
  ReceivedBlockTracker:
1. ReceivedBlockTracker会管理Spark Streaming运行过程中所有的数据。并且把数据分配给需要的batches,所有的动作都会被WAL写入到Log中,Driver失败的话,就可以根据历史恢复tracker状态,在ReceivedBlockTracker创建的时候,使用checkpoint保存历史目录。
  下面就从Receiver收到数据之后,怎么处理的开始。
2. ReceiverBlockTracker.addBlock源码如下:
Receiver接收到数据,把元数据信息汇报上来,然后通过ReceiverSupervisorImpl就将数据汇报上来,就直接通过WAL进行容错.
当Receiver的管理者,ReceiverSupervisorImpl把元数据信息汇报给Driver的时候,正在处理是交给ReceiverBlockTracker. ReceiverBlockTracker将数据写进WAL文件中,然后才会写进内存中,被当前的Spark Streaming程序的调度器使用的,也就是JobGenerator使用的。JobGenerator不可能直接使用WAL。WAL的数据在磁盘中,这里JobGenerator使用的内存中缓存的数据结构

  

  
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) //接收数据后,先进行WAL
if (writeResult) {
      synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //当WAL成功后,将Block Info元数据信息加入到Block Queue中
      }
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } else {
      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    }
    writeResult
  } catch {
case NonFatal(e) =>
      logError(s"Error adding block $receivedBlockInfo", e)
false
}
}  Driver端接收到的数据保存在streamIdToUnallocatedBlockQueues中,具体结构如下:
  

  
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]allocateBlocksToBatch把接收到的数据分配给batch,根据streamId取出Block,由此就知道Spark Streaming处理数据的时候可以有不同数据来源
那到底什么是batchTime?
batchTime是上一个Job分配完数据之后,开始再接收到的数据的时间。
/**
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) //根据StreamId获取Block信息
    }.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime //这里有对batchTime进行赋值,就是上一个job分配完数据后,开始在接收到数据的时间
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  } else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
  }
}  随着时间的推移,会不断产生RDD,这时就需要清理掉一些历史数据,可以通过cleanupOldBatches方法来清理历史数据
  

  
/**
* Clean up block information of old batches. If waitForCompletion is true, this method
* returns only after the files are cleaned up.
*/
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
  logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
  } else {
    logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
  }
}  
  以上几个方法都进行了WAL动作
(record: ReceivedBlockTrackerLogEvent): = {
(isWriteAheadLogEnabled) {
    logTrace(record)
{
.get.write(ByteBuffer.(Utils.(record))clock.getTimeMillis())
} {
(e) =>
        logWarning(recorde)
}
  } {
}
}  总结:
WAL对数据的管理包括数据的生成,数据的销毁和消费。上述在操作之后都要先写入到WAL的文件中.
  

  

  
  JobGenerator:
Checkpoint会有时间间隔Batch Duractions,Batch执行前和执行后都会进行checkpoint。
doCheckpoint被调用的前后流程:

  1、简单看下generateJobs
  
/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
    jobScheduler.
receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(
JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
      jobScheduler.reportError(
"Error generating jobs for time " + time, e)
  }
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) //job完成后就需要进行checkpoint动作
}
  2、processEvent接收到消息事件
  

  
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug(
"Got event " + event)
  event
match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater) // 调用doCheckpoint方法
case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}
  3、doCheckpoint源码如下:
  

  
/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
    logInfo(
"Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time) //最终是进行RDD的Checkpoint
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
  }
}
  4、DStream中的updateCheckpointData源码如下:最终导致RDD的Checkpoint
  

  
/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
* a default implementation that saves only the file names of the checkpointed RDDs to
* checkpointData. Subclasses of DStream (especially those of InputDStream) may override
* this method to save custom checkpoint data.
*/
private[streaming] def updateCheckpointData(currentTime: Time) {
  logDebug("Updating checkpoint data for time " + currentTime)
checkpointData.update(currentTime)
  dependencies.foreach(_.updateCheckpointData(currentTime))
  logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}  JobGenerator容错安全性如下图:


  

  

  参考博客:http://blog.csdn.net/snail_gesture/article/details/51492873#comments
备注:
资料来源于:DT_大数据梦工厂(Spark发行版本定制)
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669761-1-1.html 上篇帖子: 王家林每日大数据语录Spark篇0013(2015.11.3于广州) 下篇帖子: Spark:一个高效的分布式计算系统
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表