设为首页 收藏本站
查看: 1190|回复: 0

[经验分享] (版本定制)第1课:Spark Streaming另类在线实验及Spark Streaming本质理解

[复制链接]

尚未签到

发表于 2019-1-30 13:30:24 | 显示全部楼层 |阅读模式
  本节课内容:
      1、Spark Streaming另类在线实验解析
      2、Spark Streaming本质理解
  

      Spark Streaming是Spark Core上的一个子框架,如果我们能够完全精通这个子框架,我们就能够更好的驾驭Spark。Spark Streaming和Spark SQL是目前最流行的框架,从研究角度而言,Spark SQL有太多涉及到SQL优化的问题,不太适合用来深入研究。而Spark Streaming和其他的框架不同,它更像是Spark Core的一个应用程序。如果我们能深入的了解Spark Streaming,那我们就可以写出非常复杂的应用程序。
      Spark Streaming的优势是可以结合SparkSQL、图计算、机器学习,功能更加强大。这个时代,单纯的流计算已经无法满足客户的需求啦。在Spark中Spark Streaming也是最容易出现问题的,因为数据在不断的流动,程序在不断的运行,内部比较复杂。
  

  本次实验基于如下博客中的程序代码
IMF课程的第94课:SparkStreaming 实现广告计费系统中在线黑名单过滤实战

参考博客地址:http://lqding.blog.运维网.com/9123978/1769290

  为了更好的查看Job的运行情况,我们启动history-server
  root@spark-master:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/sbin# ./start-history-server.sh
  启动前先配置下history log日志目录
root@spark-master:/tmp# hdfs dfs -mkdir /historyServerForSpark/
配置spark-env.sh,添加一个环境变量,让history server的logDirectory指向上面建立的目录
  
export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://spark-master:8020/historyServerForSpark"
  

  配置spark-defaults.conf,添加如下配置项:
#是否记录作业产生的事件或者运行状态(job,stage等使用内存等信息)  
spark.eventLog.enabled           true
#如果记录作业产生的事件或者运行状态,则将事件写入什么位置  
spark.eventLog.dir             hdfs://spark-master:8020/historyServerForSpark
#http history的监听端口号,通过http://hadoop.master:18080访问  
spark.history.ui.port            18080
#Spark history日志位置
park.history.fs.logDirectory=hdfs://spark-master:8020/historyServerForSpark

  启动History Server,Web UI界面如下


  为了可以更清晰的看清楚Streaming运行的各个环节,我们可以通过将batchInterval的值设置的更大。例如5分钟。
  将程序上传至spark集群
  运行Spark程序
root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.OnlineBlackListFilter --master spark://spark-master:7077 ./spark.jar
通过nc -lk 9999命令发送一些数据,内容如下:
root@spark-master:~# nc -lk 9999
134343 Hadoop
343434 spark
3432777 Java
0983743 Hbase
893434 Mathou
程序输入结果为:
16/05/01 14:00:01 INFO scheduler.DAGScheduler: Job 3 finished: print at OnlineBlackListFilter.scala:63, took 0.098316 s
-------------------------------------------
Time: 1462082400000 ms
-------------------------------------------
3432777 Java
343434 spark
0983743 Hbase
  接下来,我们查看web ui中的内容,来解析SparkStreaming的运行过程。

  红色部分为我们刚刚运行的程序的日志(第一次运行时,在completed application这个地方看不到日志,在Show incomplete applications 这个地方显示了日志,可是此时程序已经退出了。)
  

  我们点击进去,查看详细信息:

  我们可以看到,这个程序在运行期间,启动了4个Job。
  先看看job id 为0 的详细信息

  这个job,很明显是我们定义的blackListRDD数据的生成。对应的代码为
  val blackList = Array(("Hadoop", true), ("Mathou", true))
//把Array变成RDD
val blackListRDD = ssc.sparkContext.parallelize(blackList)
  并且它做了reduceBykey的操作(代码中并没有此步操作,SparkStreaming框架自行生成的)。
  这里有两个Stage,Stage 0和Stage 1
  接下来我们看看Job 1的详细信息

此处也是一个makeRDD,这个RDD是receiver不断的接收数据流中的数据,在时间间隔达到Batch Interval后,将所有数据变成一个RDD。并且它的耗时也是最长的,59s
  特别说明:此处可以看出,receiver也是一个独立的job。由此我们可以得出一个结论:我们在应用程序中,可以启动多个job,并且不用的job之间可以相互配合,这就为我们编写复杂的应用程序打下了基础。

  我们点击上面的start at OnlineBlackListFilter.scala:64查看详细信息

  根据上图信息,只有一个Executor在接收数据,最最重要的是红色框中的数据本地性为PROCESS_LOCAL,由此可以知道receiver接收到数据后会保存到内存中,只要内存充足是不会写到内存中的。
  即便在创建receiver时,指定的存储默认策略为MEMORY_AND_DISK_SER_2
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
  我们再看看job 2的详细信息:

  Job 2 将前两个job生成的RDD进行leftOuterJoin操作。
  从Stage Id的编号就可以看出,它是依赖于上两个Job的。

  Receiver接收数据时是在spark-master节点上,但是Job 2在处理数据时,数据已经到了spark-worker1上了(因为我的环境只有两个worker,数据并没有分散到所有worker节点,worker节点如果多一点,情况可能不一样,每个节点都会处理数据)

  点击上面的Stage Id 3查看详细信息:

  在一个Executor上运行,并且有5个Task 。

  我们看看Job 3的详细信息:


  此处的DAG图和Job2的相同,但是Stage 6和7被跳过了。详细的原因,我们后面的课程会一一讲解。

  总结:我们可以看出,一个Batch Interval并不是单单触发一个Job。
  根据上面的描述,我们更细致的了解了DStream和RDD的关系了。DStream就是由一个个BatchInterval时间内的RDD组成的。只不过DStream带上了时间维度,是一个无边界的集合。


  对DStream的操作会构建成DStream Graph



  在每到Batch Interval时间间隔后,Job被触发,DStream Graph将会被转换成RDD Graph

  

备注:
资料来源于:DT_大数据梦工厂
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
Life is short,you need to Spark!

可以参考博客:http://blog.csdn.net/andyshar/article/details/51295030




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669663-1-1.html 上篇帖子: 发福利喽!学Spark课程送Spark技术峰会的门票........ 下篇帖子: spark2.x由浅入深深到底系列六之RDD api reduceByKey与foldByKey对比
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表