设为首页 收藏本站
查看: 1241|回复: 0

[经验分享] flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2016-11-22 09:53:36 | 显示全部楼层 |阅读模式
一、Hadoop配置安装


注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库,
所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译

1.修改Linux主机名
2.修改IP
3.修改主机名和IP的映射关系
######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机、阿里云主机等)
/etc/hosts里面要配置的是内网IP地址和主机名的映射关系
4.关闭防火墙
5.ssh免登陆
6.安装JDK,配置环境变量等


集群规划:
主机名IP安装的软件              运行的进程
hadoop01192.168.1.201jdk、hadoopNameNode、DFSZKFailoverController

(zkfc)
hadoop02192.168.1.202jdk、hadoopNameNode、DFSZKFailoverController

(zkfc)
hadoop03192.168.1.203jdk、hadoopResourceManager
hadoop04192.168.1.204jdk、hadoopResourceManager
hadoop05192.168.1.205jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、

QuorumPeerMain
hadoop06192.168.1.206jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、

QuorumPeerMain
hadoop07192.168.1.207jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、

QuorumPeerMain

说明:
1.在hadoop2.0中通常由两个NameNode组成,一个处于active状态,另一个处于standby状态。Active NameNode对外提供服务,而Standby

NameNode则不对外提供服务,仅同步active namenode的状态,以便能够在它失败时快速进行切换。
hadoop2.0官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。
这里我们使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成

功。通常配置奇数个JournalNode
这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当Active NameNode挂掉了,会自动切换Standby

NameNode为standby状态
2.hadoop-2.2.0中依然存在一个问题,就是ResourceManager只有一个,存在单点故障,hadoop-2.4.1解决了这个问题,有两个

ResourceManager,一个是Active,一个是Standby,状态由zookeeper进行协调
安装步骤:
1.安装配置zooekeeper集群(在hadoop05上)
1.1解压
tar -zxvf zookeeper-3.4.5.tar.gz -C /hadoop/
1.2修改配置
cd /hadoop/zookeeper-3.4.5/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
修改:dataDir=/hadoop/zookeeper-3.4.5/tmp
在最后添加:
server.1=hadoop05:2888:3888
server.2=hadoop06:2888:3888
server.3=hadoop07:2888:3888
保存退出
然后创建一个tmp文件夹
mkdir /hadoop/zookeeper-3.4.5/tmp
再创建一个空文件
touch /hadoop/zookeeper-3.4.5/tmp/myid
最后向该文件写入ID
echo 1 > /hadoop/zookeeper-3.4.5/tmp/myid
1.3将配置好的zookeeper拷贝到其他节点(首先分别在hadoop06、hadoop07根目录下创建一个hadoop目录:mkdir /hadoop)
scp -r /hadoop/zookeeper-3.4.5/ hadoop06:/hadoop/
scp -r /hadoop/zookeeper-3.4.5/ hadoop07:/hadoop/

注意:修改hadoop06、hadoop07对应/hadoop/zookeeper-3.4.5/tmp/myid内容
hadoop06:
echo 2 > /hadoop/zookeeper-3.4.5/tmp/myid
hadoop07:
echo 3 > /hadoop/zookeeper-3.4.5/tmp/myid

2.安装配置hadoop集群(在hadoop01上操作)
2.1解压
tar -zxvf hadoop-2.4.1.tar.gz -C /hadoop/
2.2配置HDFS(hadoop2.0所有的配置文件都在$HADOOP_HOME/etc/hadoop目录下)
#将hadoop添加到环境变量中
vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_55
export HADOOP_HOME=/hadoop/hadoop-2.4.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin

#hadoop2.0的配置文件全部在$HADOOP_HOME/etc/hadoop下
cd /hadoop/hadoop-2.6.0/etc/hadoop

2.2.1修改hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_55

2.2.2修改core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<configuration>
<!-- 指定hdfs的nameservice为wxm -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://wxm</value>
</property>
<!-- 指定hadoop临时目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/wxm/software/hadoop/hadoop-2.6.5/tmp</value>
</property>
<!-- 指定zookeeper地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
</configuration>




2.2.3修改hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<configuration>
<!--指定hdfs的nameservice为wxm,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>wxm</value>
</property>
<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.wxm</name>
<value>hadoop05,hadoop06</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.wxm.hadoop05</name>
<value>hadoop05:9000</value>
</property>



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.wxm.hadoop051</name>
<value>hadoop05:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.wxm.hadoop06</name>
<value>hadoop06:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.wxm.hadoop06</name>
<value>hadoop06:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop05:8485;hadoop06:8485/wxm</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/wxm/software/hadoop/hadoop-2.6.5/journal</value>
</property>
<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.wxm</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>
<!-- 使用sshfence隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>300000</value>
</property>
</configuration>





2.2.4修改mapred-site.xml
1
2
3
4
5
6
7
<configuration>
<!-- 指定mr框架为yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>






2.2.5修改yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<configuration>
<!-- 开启RM高可靠 -->
<property>
   <name>yarn.resourcemanager.ha.enabled</name>
   <value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
   <name>yarn.resourcemanager.cluster-id</name>
   <value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
   <name>yarn.resourcemanager.ha.rm-ids</name>
   <value>resourcemanagerAtWorker1,resourcemanagerAtWorker1</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
   <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker1</name>
   <value>worker1</value>
</property>
<property>
   <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker2</name>
   <value>worker2</value>
</property>
<!-- 指定zk集群地址 -->
<property>
   <name>yarn.resourcemanager.zk-address</name>
   <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
</configuration>






2.2.6修改slaves(slaves是指定子节点的位置,因为要在hadoop01上启动HDFS、在hadoop03启动yarn,所以hadoop01上的

slaves文件指定的是datanode的位置,hadoop03上的slaves文件指定的是nodemanager的位置)
hadoop05
hadoop06
hadoop07

2.2.7配置免密码登陆
#首先要配置hadoop01到hadoop02、hadoop03、hadoop04、hadoop05、hadoop06、hadoop07的免密码登


#在hadoop01上生产一对钥匙
ssh-keygen -t rsa
#将公钥拷贝到其他节点,包括自己
ssh-coyp-id hadoop01
ssh-coyp-id hadoop02
ssh-coyp-id hadoop03
ssh-coyp-id hadoop04
ssh-coyp-id hadoop05
ssh-coyp-id hadoop06
ssh-coyp-id hadoop07
#配置hadoop03到hadoop04、hadoop05、hadoop06、hadoop07的免密码登陆
#在hadoop03上生产一对钥匙
ssh-keygen -t rsa
#将公钥拷贝到其他节点
ssh-coyp-id hadoop04
ssh-coyp-id hadoop05
ssh-coyp-id hadoop06
ssh-coyp-id hadoop07
#注意:两个namenode之间要配置ssh免密码登陆,别忘了配置hadoop02到hadoop01的免登陆
在hadoop02上生产一对钥匙
ssh-keygen -t rsa
ssh-coyp-id -i hadoop01

2.4将配置好的hadoop拷贝到其他节点
scp -r /hadoop/ hadoop02:/
scp -r /hadoop/ hadoop03:/
scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop04:/hadoop/
scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop05:/hadoop/
scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop06:/hadoop/
scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop07:/hadoop/
###注意:严格按照下面的步骤
2.5启动zookeeper集群(分别在hadoop05、hadoop06、hadoop07上启动zk)
cd /hadoop/zookeeper-3.4.5/bin/
./zkServer.sh start
#查看状态:一个leader,两个follower
./zkServer.sh status

2.6启动journalnode(分别在在hadoop05、hadoop06、tcast07上执行)
cd /hadoop/hadoop-2.4.1
sbin/hadoop-daemon.sh start journalnode
#运行jps命令检验,hadoop05、hadoop06、hadoop07上多了JournalNode进程

2.7格式化HDFS
#在hadoop01上执行命令:
hdfs namenode -format
#格式化后会在根据core-site.xml中的hadoop.tmp.dir配置生成个文件,这里我配置的是/hadoop/hadoop-2.4.1/tmp,然后

将/hadoop/hadoop-2.4.1/tmp拷贝到hadoop02的/hadoop/hadoop-2.4.1/下。
scp -r tmp/ hadoop02:/hadoop/hadoop-2.4.1/

2.8格式化ZK(在hadoop01上执行即可)
hdfs zkfc -formatZK

2.9启动HDFS(在hadoop01上执行)
sbin/start-dfs.sh

                2.11启动zkfc
hadoop-daemon.sh start zkfc

2.10启动YARN(#####注意#####:是在hadoop03上执行start-yarn.sh,把namenode和resourcemanager分开是因为性能问题,因为

他们都要占用大量资源,所以把他们分开了,
                      他们分开了就要分别在不同的机器上启动)
sbin/start-yarn.sh


到此,hadoop-2.4.1配置完毕,可以统计浏览器访问:
http://192.168.1.201:50070
NameNode 'hadoop01:9000' (active)
http://192.168.1.202:50070
NameNode 'hadoop02:9000' (standby)

验证HDFS HA
首先向hdfs上传一个文件
hadoop fs -put /etc/profile /profile
hadoop fs -ls /
然后再kill掉active的NameNode
kill -9 <pid of NN>
通过浏览器访问:http://192.168.1.202:50070
NameNode 'hadoop02:9000' (active)
这个时候hadoop02上的NameNode变成了active
在执行命令:
hadoop fs -ls /
-rw-r--r--   3 root supergroup       1926 2014-02-06 15:36 /profile
刚才上传的文件依然存在!!!
手动启动那个挂掉的NameNode
sbin/hadoop-daemon.sh start namenode
通过浏览器访问:http://192.168.1.201:50070
NameNode 'hadoop01:9000' (standby)

验证YARN:
运行一下hadoop提供的demo中的WordCount程序:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /profile /out

OK,大功告成!!!

二、spark安装
    standalone模式搭建比较简单  参考官网即可

三、kafka集群搭建
    1.  kafka2.11下载并解压


    2.  修改配置文件


    · config/server.properties
        broker.id=4(集群里的id不能重复,我是取每台机器IP最后一位)
        listeners=PLAINTEXT://192.168.248.134:9092(格式不变,绑定本机IP)
        log.dirs=/home/hadoop/kafka/logs4kafka(日志路径)
        zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
最后一项时延可以增大一些,曾经因为超过时延而报错,修改成10倍就OK了
     ·producer.properties
        bootstrap.servers=h2:9092,h3:9092,h4:9092,h8:9092,h9:9092,h10:9092
     ·consumer.properties
        zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
        group不配的话就是默认,看需求
    3.拷贝到其他节点,注意修改listeners绑定的IP和broker.id


    4 基本命令

    #start
    bin/kafka-server-start.sh config/server.properties

    #create a topic
    bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --replication-factor 5 --partition 5 --topic T20161021

    #list all topics
    bin/kafka-topics.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --list

    #describe the detail of this topic
    bin/kafka-topics.sh --describe --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181  --topic test101

    #write some Messages to the topic
    bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161021

    #recerive the message producer writed
    bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --topic T20161021 --from-beginning

四、flume-ng 1.6安装配置

    1.下载安装并配置好flume的运行环境

    2.编写配置文件

    # ------------------- 定义数据流----------------------
    agent.sources = kafkaSource
    agent.channels = memoryChannel
    agent.sinks = hdfsSink
    agent.sources.kafkaSource.channels = memoryChannel
    agent.sinks.hdfsSink.channel = memoryChannel

    #-------- kafkaSource相关配置-----------------
    agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
    agent.sources.kafkaSource.topic = T20161031
    #agent.sources.kafkaSource.groupId = flume
    agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000

    #------- memoryChannel相关配置-------------------------
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity=10000
    agent.channels.memoryChannel.transactionCapacity=1000
    #---------hdfsSink 相关配置------------------
    agent.sinks.hdfsSink.type = hdfs
    agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
    agent.sinks.hdfsSink.hdfs.writeFormat = Text
    agent.sinks.hdfsSink.hdfs.fileType = DataStream

    3.启动Flume
    bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console

五、sparkStreaming1.6.2

1
2
3
4
   
import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.*;import scala.Tuple2;import java.util.Arrays;public class StreamingOnHDFS {public static void main(String[] args){final SparkConf conf = newSparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {@Overridepublic JavaStreamingContext create() {return createContext(checkpointDirectory, conf) ;}} ;JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String line) throws Exception {return Arrays. asList(line.split(" ")) ;}}) ;JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1) ;}}) ;JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,Integer>() {public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2;}}) ;wordscount.print();jsc.start() ;jsc.awaitTermination() ;jsc.close() ;}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){System. out.println("==========Creating new context==============") ;JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;ssc.checkpoint(checkpointDirectory) ;return ssc;}}



六、测试

    1.将程序打成runnable jar并上传

    2.启动zookeeper、hadoop、spark、kafka、flume
    3.进入spark安装目录下执行

    bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar

七、扩展
   数据导入HDFS之后即可进行基于MR/spark的批处理或准实时的流式处理,可作海量数据的清洗工作或预处理,结果可存入Hbase、redis或RDBMS以做进一步的展示或挖掘

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-303869-1-1.html 上篇帖子: 排查logstash2.4升级到5.0版本后kafka不兼容问题 下篇帖子: Apache Kafka 0.10.1.0 发布,大量更新
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表