设为首页 收藏本站
查看: 779|回复: 0

[经验分享] Apache Spark源码走读之4

[复制链接]

尚未签到

发表于 2015-8-5 08:25:59 | 显示全部楼层 |阅读模式
  欢迎转载,转载请注明出处,徽沪一郎。
  Spark Streaming能够对流数据进行近乎实时的速度进行数据处理。采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处理速度,与storm相比拥有更高的吞能力。
  本篇简要分析Spark Streaming的处理模型,Spark Streaming系统的初始化过程,以及当接收到外部数据时后续的处理步骤。

系统概述

流数据的特点
  与一般的文件(即内容已经固定)型数据源相比,所谓的流数据拥有如下的特点


  • 数据一直处在变化中
  • 数据无法回退
  • 数据一直源源不断的涌进

DStream
  如果要用一句话来概括Spark Streaming的处理思路的话,那就是"将连续的数据持久化,离散化,然后进行批量处理"。
  让我们来仔细分析一下这么作的原因。


  • 数据持久化 将从网络上接收到的数据先暂时存储下来,为事件处理出错时的事件重演提供可能,
  • 离散化 数据源源不断的涌进,永远没有一个尽头,就像周星驰的喜剧中所说“崇拜之情如黄河之水绵绵不绝,一发而不可收拾”。既然不能穷尽,那么就将其按时间分片。比如采用一分钟为时间间隔,那么在连续的一分钟内收集到的数据集中存储在一起。
  • 批量处理 将持久化下来的数据分批进行处理,处理机制套用之前的RDD模式
  DStream可以说是对RDD的又一层封装。如果打开DStream.scala和RDD.scala,可以发现几乎RDD上的所有operation在DStream中都有相应的定义。
  作用于DStream上的operation分成两类


  • Transformation
  • Output 表示将输出结果,目前支持的有print, saveAsObjectFiles, saveAsTextFiles, saveAsHadoopFiles

DStreamGraph
  有输入就要有输出,如果没有输出,则前面所做的所有动作全部没有意义,那么如何将这些输入和输出绑定起来呢?这个问题的解决就依赖于DStreamGraph,DStreamGraph记录输入的Stream和输出的Stream。

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
var rememberDuration: Duration = null
var checkpointInProgress = false


  outputStreams中的元素是在有Output类型的Operation作用于DStream上时自动添加到DStreamGraph中的。
  outputStream区别于inputStream一个重要的地方就是会重载generateJob.

初始化流程
DSC0000.png

StreamingContext
  StreamingContext是Spark Streaming初始化的入口点,主要的功能是根据入参来生成JobScheduler

设定InputStream
  如果流数据源来自于socket,则使用socketStream。如果数据源来自于不断变化着的文件,则可使用fileStream

提交运行
  StreamingContext.start()
  

数据处理

  以socketStream为例,数据来自于socket。
  SocketInputDstream启动一个线程,该线程使用receive函数来接收数据

def receive() {                                                                                                         
var socket: Socket = null                                                                                             
try {                                                                                                                  
logInfo("Connecting to " + host + ":" + port)                                                                        
socket = new Socket(host, port)                                                                                      
logInfo("Connected to " + host + ":" + port)                                                                        
val iterator = bytesToObjects(socket.getInputStream())                                                               
while(!isStopped && iterator.hasNext) {                                                                              
store(iterator.next)                                                                                               
}                                                                                                                    
logInfo("Stopped receiving")                                                                                         
restart("Retrying connecting to " + host + ":" + port)                                                               
} catch {                                                                                                              
case e: java.net.ConnectException =>                                                                                 
restart("Error connecting to " + host + ":" + port, e)                                                            
case t: Throwable =>                                                                                                
restart("Error receiving data", t)                                                                                 
} finally {      
if (socket != null) {                                                                                                
socket.close()                                                                                                     
logInfo("Closed socket to " + host + ":" + port)                                                                  
}                                                                                                                    
}                                                                                                                     
}                                                                                                                        
}        


  接收到的数据会被先存储起来,存储最终会调用到BlockManager.scala中的函数,那么BlockManager是如何被传递到StreamingContext的呢?利用SparkEnv传入的,注意StreamingContext构造函数的入参。


处理定时器
  数据的存储有是被socket触发的。那么已经存储的数据被真正的处理又是被什么触发的呢?
  记得在初始化StreamingContext的时候,我们指定了一个时间参数,那么用这个参数会构造相应的重复定时器,一旦定时器超时,调用generateJobs函数。

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
  事件处理函数

/** Processes all events */                                                                                             
private def processEvent(event: JobGeneratorEvent) {                                                                     
logDebug("Got event " + event)                                                                                         
event match {                                                                                                         
case GenerateJobs(time) => generateJobs(time)                                                                        
case ClearMetadata(time) => clearMetadata(time)                                                                     
case DoCheckpoint(time) => doCheckpoint(time)                                                                        
case ClearCheckpointData(time) => clearCheckpointData(time)                                                         
}                                                                                                                     
}     


  generteJobs

private def generateJobs(time: Time) {                                                                                   
SparkEnv.set(ssc.env)                                                                                                  
Try(graph.generateJobs(time)) match {                                                                                 
case Success(jobs) =>                                                                                                
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>                                             
val streamId = stream.id                                                                                         
val receivedBlockInfo = stream.getReceivedBlockInfo(time)                                                        
(streamId, receivedBlockInfo)                                                                                    
}.toMap                                                                                                            
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))                                                   
case Failure(e) =>                                                                                                   
jobScheduler.reportError("Error generating jobs for time " + time, e)                                             
}                                                                                                                     
eventActor ! DoCheckpoint(time)                                                                                       
}         


  generateJobs->generateJob一路下去会调用到Job.run,在job.run中调用sc.runJob,在具体调用路径就不一一列出。

private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}


  DStream.generateJob函数中定义了jobFunc,也就是在job.run()中使用到的jobFunc

  private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}


  在这个流程中,DStreamGraph起到非常关键的作用,非常类似于TridentStorm中的graph.
  在generateJob过程中,DStream会通过调用compute函数生成相应的RDD,SparkContext则是将基于RDD的抽象转换成为多个stage,而执行。
  StreamingContext中一个重要的转换就是DStream到RDD的转换,而SparkContext中一个重要的转换是RDD到Stage及Task的转换。在这两个不同的抽象类中,要注意其中getOrCompute和compute函数的实现。

小结
  本篇内容有点仓促,内容不够丰富翔实,争取回头有空的时候再好好丰富一下具体的调用路径。
  对于容错处理机制,本文没有涉及,待研究明白之后另起一篇进行阐述。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-94203-1-1.html 上篇帖子: 给Tomcat,Apache配置gzip压缩(HTTP压缩)功能 下篇帖子: The file cannot be validated as theho st "struts.apache.org" is currently unreac
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表