设为首页 收藏本站
查看: 2526|回复: 0

[经验分享] Flume+Kafka+SparkStreaming整合

[复制链接]
累计签到:3 天
连续签到:1 天
发表于 2015-11-27 17:05:25 | 显示全部楼层 |阅读模式
  目录
  1.Flume介绍.2
  1.1 Flume数据源以及输出方式.2
  1.2 Flume的核心概念.2
  1.3 Flume结构.2
  1.4 Flume安装测试.3
  1.5 启动flume4
  2.Kafka介绍.4
  2.1 Kafka产生背景.4
  2.2 Kafka部署结构.4
  2.3 Kafka集群架构.4
  2.4 Kafka基本概念.5
  2.5 Kafka安装测试.5
  3.Flume和Kafka整合.6
  3.1两者整合优势.6
  3.2 Flume和Kafka整合安装.6
  3.3 启动kafka flume相关服务.7
  3.4 Kafka和SparkStreaming整合.8
  
1. Flume介绍

Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。


1.1 Flume数据源以及输出方式

Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。

  Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。本测试研究中由kafka来接收数据。

1.2 Flume的核心概念

1.  Agent:使用JVM运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。

2.  Client:生产数据,运行在一个独立的线程。

3.  Source:从Client收集数据,传递给Channel。

4.  Sink:从Channel收集数据,运行在一个独立线程。

5.  Channel:连接 sources和 sinks ,这个有点像一个队列。

6.  Events :可以是日志记录、 avro对象等。

1.3 结构

  Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:

DSC0000.jpg
Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、ContextualRouting、BackupRoutes。如下图所示:

DSC0001.jpg

DSC0002.jpg


1.4 安装测试

  解压apache-flume-1.6.0-bin.tar.gz:tar –zxvf apache-flume-1.6.0-bin.tar.gz
  cp conf/flume-conf.properties.template conf/exec.conf
  cp conf/flume-env.sh.template conf/flume-env.sh    配置JAVA_HOME
  exec.conf配置如下:
  
a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.channels = c2

a2.sources.r2.command=tail -n +0 -F /usr/local/hadoop/flume/test.log

# Describe the sink

a2.sinks.k2.type = logger

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

  
  验证安装:flume-ng version
DSC0003.jpg



  
1.5 启动flume

  flume-ng agent --conf ./flume/conf/ -f ./flume/conf/exec.conf-Dflume.root.logger=DEBUG,console -n a2
  发送数据和flume接收数据:
   DSC0004.jpg

DSC0005.jpg


  
  2.Kafka介绍
  2.1 产生背景
  Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。Kafka是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
  在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志)Kafka 就出现了。Kafka 可以起到两个作用:
  降低系统组网复杂度
  降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka 承担高速数据总线的作用。
  
2.2 部署结构

DSC0006.jpg
  
2.3 集群架构

  
   DSC0007.jpg

  
  
  2.4 基本概念
  Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
  Partition:Topic 物理上的分组,一个topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。
  Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
  Producers:消息和数据生产者,向 Kafka的一个 topic 发布消息的过程叫做 producers。
  Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。
  Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为broker。
  
2.5 安装测试

  解压Kafka: tar -xzf kafka_2.10-0.8.1.1.tgz
  启动ZK bin/zookeeper-server-start.shconfig/zookeeper.properties
启动服务bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1&

  创建主题 bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic test
  查看主题 bin/kafka-topics.sh --list --zookeeperlocalhost:2181
查看主题详情 bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test

  删除主题 bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --topic test --zookeeper localhost:2181
创建生产者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

创建消费者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning


3.Flume和Kafka整合

3.1 两者整合优势

Flume更倾向于数据传输本身,Kakfa是典型的消息中间件用于解耦生产者消费者。

具体架构上,Agent并没把数据直接发送到Kafka,在Kafka前面有层由Flume构成的forward。这样做有两个原因:

Kafka的API对非JVM系的语言支持很不友好,forward对外提供更加通用的HTTP接口。

forward层可以做路由、Kafka topic和Kafkapartition key等逻辑,进一步减少Agent端的逻辑。

数据有数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算。本文实时计算采用SparkStreaming做测试。


3.2 Flume和Kafka整合安装

Flume和Kafka插件包下载:https://github.com/beyondj2ee/flumeng-kafka-plugin

提取插件中的flume-conf.properties文件:修改如下:flume源采用exec


producer.sources.s.type = exec

producer.sources.s.command=tail -f -n+1/usr/local/Hadoop/flume/test.log

producer.sources.s.channels = c


修改producer代理的topic为test

将配置放到flume/cong/producer.conf中

复制插件包中的jar包到flume/lib中:删除掉版本不同的相同jar包,这里需要删除scala-compiler-z.9.2.jar包,否则flume启动会出现问题。

DSC0008.jpg


复制kafka/libs中的jar包到flume/lib中。

完整producer.conf:


producer.conf:

#agentsection  

producer.sources= s  

producer.channels= c  

producer.sinks= r  

#sourcesection  

producer.sources.s.type= exec

#producer.sources.s.spoolDir= /usr/local/hadoop/flume/logs

#producer.sources.s.fileHeader= true

producer.sources.s.command= tail -f -n+1 /usr/local/hadoop/flume/aaa.log

producer.sources.s.channels= c  

# Eachsink's type must be defined  

producer.sinks.r.type= org.apache.flume.plugins.KafkaSink  

producer.sinks.r.metadata.broker.list=localhost:9092  

producer.sinks.r.partition.key=0  

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0  

producer.sinks.r.max.message.size=1000000  

producer.sinks.r.producer.type=sync  

producer.sinks.r.custom.encoding=UTF-8  

producer.sinks.r.custom.topic.name=test  

#Specifythe channel the sink should use  

producer.sinks.r.channel= c  

# Eachchannel's type is defined.  

producer.channels.c.type= memory  

producer.channels.c.capacity= 1000

producer.channels.c.transactionCapacity= 100



3.3 启动kafka flume相关服务

启动ZKbin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka服务 bin/kafka-server-start.sh config/server.properties

创建消费者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning

启动flume

flume-ng agent --conf./flume/conf/ -f ./flume/conf/producer.conf -Dflume.root.logger=DEBUG,console-n producer


向flume发送数据:

DSC0009.jpg


Kafka消费者数据:




DSC00010.jpg

3.4 Kafka和SparkStreaming整合

核心代码:


完整代码路径:

spark-1.4.0\examples\src\main\java\org\apache\spark\examples\streaming


DSC00011.jpg


DSC00012.jpg


执行参数:

DSC00013.jpg


发送数据:

由于flume采用exec数据源的方式,因此flume会监听配置的相应的文件: tail -f -n+1 /usr/local/Hadoop/flume/aaa.log

当向该文件追加文件时,flume就会获取追加的数据:

writetoflume.py

DSC00014.jpg


flume将获取的增量数据由sink发送给kafka,以下是kafka comsumer消费的数据



DSC00015.jpg



执行结果:

SparkStreaming订阅kafka的test主题的数据,将订阅的数据进行单词计数处理。





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144265-1-1.html 上篇帖子: Flume+storm+kafka 下篇帖子: Flume与Kafka整合
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表