设为首页 收藏本站
查看: 1579|回复: 0

[经验分享] 【Spark七十四】Spark Streaming整合Flume-NG二

[复制链接]

尚未签到

发表于 2017-5-22 06:23:22 | 显示全部楼层 |阅读模式
  在http://bit1129.iteye.com/blog/2184467一文中对Spark Streaming整合Flume-NG进行了基本的配置,并且Spark Streaming能够监听到来自于Flume的数据输出(通过Sink),不过代码很简单同时也是在单机上(Master和Worker在同一台机器上)进行试验的,因而还有有几个问题没有解决,本文继续Spark Streaming整合Flume-NG

package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
object SparkFlumeNGWordCount {
def main(args : Array[String]) {
val conf = new SparkConf().setAppName("SparkFlumeNGWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
//9999端口是由这个Spark负责开启并监听的,Spark Streaming采集写到这个端口的数据
//问题:如果这个代码运行在集群中,那么localhost指的是driver所在的IP,还是每个worker所在的IP
//即每个worker都会启动这个端口?这很重要因为它将影响Flume的配置(Flume的Sink就是9999端口)
///答案:只是一个Worker的IP,那么问题是如何知道Receiver在哪个Worker Node上启动?做法时先启动Spark Streaming,然后确定Receiver在哪个Node上启动
val lines = FlumeUtils.createStream(ssc,"localhost",9999)
lines.cache();
//lines是DStream对象,它的每个元素是SparkFlumeEvent对象,可以将它转换为字符窜(evt.event.getBody.array())
// 打印显示前10行的字符串
lines.map(evt => {
val str = new String(evt.event.getBody.array())
//打印到控制的文本内容
("string received: "+ str)
//print方法是action,也就是让map的转换操作运行,必须调用action
}).print()
//保存到磁盘文件
lines.map(evt => {
val str = new String(evt.event.getBody.array())
//保存到磁盘文件的内容
("string received: "+ str)
}).saveAsTextFiles("file:///home/hadoop/flumeoutput", "suff")
lines.count().map(cnt => "Received " + cnt + " flume events. at " + System.currentTimeMillis() ).print()
ssc.start()
ssc.awaitTermination();
}
}

  例子中,Spark Streaming对接收到来自于Flume的输入(通过9999端口)一方面打印到控制台(通过print算子,print只打印前10行),另一方面通过saveAsTextFiles写到磁盘文件中,这验证了一个问题,进行Flume输出的数据进行了包装,包装有headers,有body的JSON串,但是通过SparkFlumeEnvent.event.getBody.array()还是很容易的获取到真正的数据,获取到用户数据后可以对数据进行操作
  有几个问题需要解决:
  1. FlumeUtils的createStream方法有两个参数,host和port,那么这个host和port只会在Driver所在机器上开启监听,还是在所有Workers上也会监听。这个问题实质上是要回答这样一个问题,如下代码是在哪里执行

val lines = FlumeUtils.createStream(ssc,"localhost",9999)
  因为这是main函数,即DriverProgram,我认为上面的代码应该在Driver上运行,  因为Driver的目的,一是构造RDD以及相应的DAG,然后提交作业,作业中的Task是例子中的print()和saveAsTextFiles触发Job提交,然后再划分Stage形成TaskSet创建的,这些Task是与val lines = FlumeUtils.createStream(ssc,"localhost",9999)代码无关的,所以,这个application只会在Driver上开启9999端口
  答:这个认为是错误的,即9999是在Worker上监听的,也就是数据直接流向Worer节点了,而输出是在Driver上。这也就可以理解,Spark RDD的数据本地性了,所有的数据都在本地计算。
  同时发现了一个现象:
  三台虚机(一主两从),提交application的时候,指定的参数--total-executor-cores 为2,当集群启动后,发现Master分配了两个core,两个Slave也分配两个,但是由于物理机只有4个core,因此,两个Slave真正的core个数是每个Slave1个,
  之前提到过1个core无法运行spark streaming程序,因为关掉一个虚机,采用一主一从每个分配2个core的方式运行。
  是否可以认为--total-executor-cores参数的意义是给集群中的每个节点分配这么core(如果某个节点core不够,那么就有几个分配几个)
  关于hostname和9999的详细说明:

1. Spark will listen on the given port for Flume to push data into it.
2. When in local mode, it will listen on localhost:9999
3. When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark will launch the Flume receiver on that node (assuming the hostname matching is correct), and list on port 9999, for receiving data from Flume. So only the configured machine will listen on port 9999.
  2. 上例中,saveAsTextFiles,是DStream的一个方法。 注意Files是复数形式,即会产生多个文件目录,多个文件是指DStream中的每个RDD都会调用其saveAsFile方法,也就是每个RDD都会产生一个文件目录

  //prefix:
//suffix:
def saveAsTextFiles(prefix: String, suffix: String = "") {
///saveFunc方法,time入参是rdd构造时的时间,因此每个RDD都不同
val saveFunc = (rdd: RDD[T], time: Time) => {
///根据前缀,后缀以及RDD的标识创建RDD的名字
val file = rddToFileName(prefix, suffix, time)
///每个RDD都保存在不同的文件目录中
rdd.saveAsTextFile(file)
}
//对于DStream中的每个RDD,调用saveFunc函数
this.foreachRDD(saveFunc)
}
  格式是prefix-时间戳.suff,如下所示

[hadoop@hadoop ~]$ pwd
/home/hadoop
[hadoop@hadoop ~]$ ls -l
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:15 flumeoutput-1424484950000.suff
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:16 flumeoutput-1424484960000.suff
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:16 flumeoutput-1424484970000.suff
drwxrwxr-x   2 hadoop hadoop       80 Feb 20 21:16 flumeoutput-1424484980000.suff
drwxrwxr-x   2 hadoop hadoop       80 Feb 20 21:16 flumeoutput-1424484990000.suff
drwxrwxr-x   2 hadoop hadoop      119 Feb 20 21:16 flumeoutput-1424485000000.suff
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:16 flumeoutput-1424485010000.suff
  因此在定义prefix时,最后定义为多级目录,不要写成/home/hadoop/flumeoutput,应该写成/home/hadoop/flumeoutput/appname/flume
  关于Spark Streaming接收到FlumeNG发送来的Avro数据的处理:

val events = FlumeUtils.createStream(ssc, receiverHostname, receiverPort)
val lines = events.map{e => new String(e.event.getBody().array(), "UTF-8")}
   events是一个DStream,其中的每个元素是SparkFlumeEvents对象,SparkFlumeEvents.event获取到AvroFlumeEvent对象,AvroFlumeEvent的getBody方法获取到数据Body,array()方法转换为字节数组

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379678-1-1.html 上篇帖子: 【Spark七十五】Spark Streaming整合Flume-NG三之接入log4j 下篇帖子: Flume + HDFS Sink采集数据及如何添加第三方JAR
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表