设为首页 收藏本站
查看: 1177|回复: 0

[经验分享] flume+kafka+sparkstreaming搭建整合

[复制链接]

尚未签到

发表于 2017-5-21 13:57:48 | 显示全部楼层 |阅读模式
主要是数据从flume进去kafka,然后交给sparkstreaming处理的流程
本文依旧以单词计数例子为例
首先,flume使用1.6版本,如果是1.6以下的话,没带有官方的kafkasink,需要自己实现,自己实现也不难实现,写一个自定义的sink,在里面方法调用kafka生产者代码,把数据发送到指定的kafka的broker的topic即可。
此处使用1.6版本,直接使用kafkaSink即可
agent4.channels.ch1.type = memory
agent4.sources.avro-source1.channels = ch1
agent4.sources.avro-source1.type = avro
agent4.sources.avro-source1.bind = 0.0.0.0
agent4.sources.avro-source1.port = 41414
agent4.sinks.log-sink1.channel = ch1
agent4.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent4.sinks.log-sink1.topic = test
agent4.sinks.log-sink1.brokerList = localhost:9092
agent4.sinks.log-sink1.requiredAcks = 1
agent4.sinks.log-sink1.batchSize = 20
agent4.channels = ch1
agent4.sources = avro-source1
agent4.sinks = log-sink1

然后启动flume即可 bin/flume-ng agent --conf ./conf/ -f conf/agent4 -Dflume.root.logger=DEBUG,console -n agent4
开始进行kafka的操作,这里都是单机部署,包括flume和kafka
首先启动zookeeper服务bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动Kafkabin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
然后创建一个"test"的topic,一个分区一个副本bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以查看一下所有主题,验证一下bin/kafka-topics.sh --list --zookeeper localhost:2181

然后开始写spark部分的代码
首先加入kafka的maven依赖<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.2.1</version>
</dependency>
代码如下:package cn.han
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._
object MySparkkafka {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0")
val sc = new SparkContext("local[2]", "Spark Streaming kafka Integration")  
//创建StreamingContext,3秒一个批次  
val ssc = new StreamingContext(sc, Seconds(4))  
val kafkaParams: Map[String, String] = Map("group.id" -> "terran")
val readParallelism = 5
val topics = Map("test" -> 1)
val sl=StorageLevel.MEMORY_ONLY
val kafkaStream=KafkaUtils.createStream(ssc,"192.168.5.220:2181","terran", topics, sl)

val rss2=kafkaStream.flatMap(x =>{
val by=x._2
val sb=by.split(" ")
sb
})
val rdd3=rss2.map(x=>(x,1))
val rdd4=rdd3.reduceByKey(_+_)
rdd4.print()
//开始运行  
ssc.start()  
//计算完毕退出  
ssc.awaitTermination()  
sc.stop()  
}
}

还需要一个简单的log4j程序,向flume写入测试数据,上一篇博客已经引入,这里就不再赘述了。最终,执行spark代码即可。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379634-1-1.html 上篇帖子: flume与kafka集成 下篇帖子: flume ng1.6 +kafka sink
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表