设为首页 收藏本站
查看: 1756|回复: 0

[经验分享] 【Spark六十一】Spark Streaming结合Flume、Kafka进行日志分析

[复制链接]

尚未签到

发表于 2017-5-22 07:19:17 | 显示全部楼层 |阅读模式
  第一步,Flume和Kakfa对接,Flume抓取日志,写到Kafka中
  第二部,Spark Streaming读取Kafka中的数据,进行实时分析
  本文首先使用Kakfa自带的消息处理(脚本)来获取消息,走通Flume和Kafka的对接

1. Flume配置
  1. 下载Flume和Kafka集成的插件,下载地址:https://github.com/beyondj2ee/flumeng-kafka-plugin。将package目录中的flumeng-kafka-plugin.jar拷贝到Flume安装目录的lib目录下
  2. 将Kakfa安装目录libs目录下的如下jar包拷贝到Flume安装目录的lib目录下
  kafka_2.10-0.8.1.1.jar
  scala-library-2.10.1.jar
  metrics-core-2.2.0.jar
  3.添加agent配置

producer.sources = s
producer.channels = c
producer.sinks = r
#source section   
#producer.sources.s.type = seq   
producer.sources.s.type = netcat
producer.sources.s.bind = localhost
producer.sources.s.port = 44444
producer.sources.s.channels = c
# Each sink's type must be defined   
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
##定义Kafka接收消息的Topic的名字
producer.sinks.r.custom.topic.name=test
#Specify the channel the sink should use   
producer.sinks.r.channel = c
# Each channel's type is defined.   
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
  3.1 上面指定了sink的类型为KafkaSink,目的是将日志送往Kafka消息队列,分区类为SinglePartition
  3.2  指定topic的名字为test
  3.3 指定Flume的消息源来自于netcat,(localhost,44444)
  4. 启动Flume

./flume-ng agent -f ../conf/kafka.conf  -c . -n producer
  指定配置文件和agent的名字

Kafka配置
  5. 启动Kafka

./kafka-server-start.sh ../config/server.properties
  5.1 启动Kafka依赖的Zookeeper,添加topic名字为test,详见
  5.2 启动Kakfa的消息接收进程

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  6.启动telnet,输入netcat接受的数据

telnet localhost 44444

数据流转过程
  1. 在telnet终端输入数据,被Flume的source接受
  2. Flume将数据写入到Kafka消息队列中,在Flume_Kafka的插件中有向Kafka发送消息的逻辑
  3. Kafka消息消费者,监听到Kafka队列中来了消息,那么就在Kakfa的消息接收端看到控制台上有输出

问题:
  1. 此处Kafka使用SinglePartition的方式接收消息,如果是Kafka集群,那么Flume如何写入消息到一个topic的多个partition中
  2. Flume的消息源是监听端口44444实现的,如何监听日志文件呢?日志文件可以自动增长,另外也会自动的创建新的日志文件,这用Kafka如何处理?
  对于监听日志文件,应该使用Flume结合Log4J的方式,有个专门针对Flume的Log4J Appender,可以将写入到文件的内容通过Appender发送给Flume作为数据源,Flume的源收到数据后,就可以通过Channel发送给Sink(此处的Sink是KafkaSingk)

关于Kafka的Partition
  1. 第一个问题,SinglePartition的实现

package org.apache.flume.plugins;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SinglePartition implements Partitioner<String> {
public SinglePartition(VerifiableProperties props) {
}
@Override
public int partition(String key, int numberOfPartions) {
return 0;
}
}
  可见,只要把partition方法实现为 key.hashCode()%numberOfPartitions即可
  2. 第二个问题,如何设置Kafka的一个topic几个partition?
  在创建topic时,就需要指定partition的个数

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  建立一个分区数为17,复制因为为3的topic,看看zk上记录了哪些信息,

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 17 --topic test
  2.1. 报错:也就是说,复制因子不能比brokers的个数大

[hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 17 --topic test_many_partitions
Error while executing topic command replication factor: 3 larger than available brokers: 1
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:50)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
  2.2 新建了topic后,Kafka server日志显示

[2015-02-14 02:53:53,526] INFO Completed load of log test_many_partitions-4 with log end offset 0 (kafka.log.Log)
[2015-02-14 02:53:53,526] INFO Created log for partition [test_many_partitions,4] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2015-02-14 02:53:53,527] WARN Partition [test_many_partitions,4] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,4] (kafka.cluster.Partition)
[2015-02-14 02:53:53,540] INFO Completed load of log test_many_partitions-13 with log end offset 0 (kafka.log.Log)
[2015-02-14 02:53:53,541] INFO Created log for partition [test_many_partitions,13] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2015-02-14 02:53:53,541] WARN Partition [test_many_partitions,13] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,13] (kafka.cluster.Partition)
[2015-02-14 02:53:53,554] INFO Completed load of log test_many_partitions-1 with log end offset 0 (kafka.log.Log)
[2015-02-14 02:53:53,555] INFO Created log for partition [test_many_partitions,1] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2015-02-14 02:53:53,555] WARN Partition [test_many_partitions,1] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,1] (kafka.cluster.Partition)
  3.3 查看zk上关于具有多partition的topic,结果如下:
  17个partition

[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics
[test_many_partitions, test]
[zk: localhost:2181(CONNECTED) 27] ls /brokers/topics/test_many_partitions
[partitions]
[zk: localhost:2181(CONNECTED) 28] ls /brokers/topics/test_many_partitions/partitions
[15, 16, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8]
[zk: localhost:2181(CONNECTED) 29]
  1个partition

[zk: localhost:2181(CONNECTED) 30] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 31] ls /brokers/topics/test/partitions
[0]
  参考:
  https://github.com/beyondj2ee/flumeng-kafka-plugin
  http://blog.csdn.net/weijonathan/article/details/18301321
  http://liyonghui160com.iteye.com/blog/2173235

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379690-1-1.html 上篇帖子: Flume+Kafka+Strom基于伪分布式环境的结合使用 下篇帖子: kafka use
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表