lkjhgd 发表于 2016-7-7 08:58:10

logstash将Kafka中的日志数据订阅到HDFS

前言:通常情况下,我们将Kafka的日志数据通过logstash订阅输出到ES,然后用Kibana来做可视化分析,这就是我们通常用的ELK日志分析模式。但是基于ELK的日志分析,通常比较常用的是实时分析,日志存个十天半个月都会删掉。那么在一些情况下,我需要将日志数据也存一份到我HDFS,积累到比较久的时间做半年、一年甚至更长时间的大数据分析。下面就来说如何最简单的通过logstash将kafka中的数据订阅一份到hdfs。

一:安装logstash(下载tar包安装也行,我直接yum装了)

1
#yum install logstash-2.1.1





二:从github上克隆代码

1
2
3
#git clonehttps://github.com/heqin5136/logstash-output-webhdfs-discontinued.git
#ls
logstash-output-webhdfs-discontinued





三:安装logstash-output-webhdfs插件

1
2
3
#cd logstash-output-webhdfs-discontinued
logstash的bin目录下有个plugin,使用plugin来安装插件
#/opt/logstash/bin/plugin install logstash-output-webhdfs






四:配置logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#vim /etc/logstash/conf.d/logstash.conf
input {
kafka {
    zk_connect => '10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181'   #kafka的zk集群地址
    group_id => 'hdfs'                     #消费者组,不要和ELK上的消费者一样
    topic_id => 'apiAppWebCms-topic'       #topic
    consumer_id => 'logstash-consumer-10.10.8.8'   #消费者id,自定义,我写本机ip。
    consumer_threads => 1
    queue_size => 200
    codec => 'json'
}
}

output {            
#如果你一个topic中会有好几种日志,可以提取出来分开存储在hdfs上。
if == "apiNginxLog" {
    webhdfs {
         workers => 2
         host => "10.10.8.1"      #hdfs的namenode地址   
         port => 50070            #webhdfs端口
         user => "hdfs"             #hdfs运行的用户啊,以这个用户的权限去写hdfs。
         path => "/data/logstash/apiNginxLog-%{+YYYY}-%{+MM}-%{+dd}/logstash-%{+HH}.log
             #按天建目录,按小时建log文件。
         flush_size => 500
#       compression => "snappy"             #压缩格式,可以不压缩
      idle_flush_time => 10
      retry_interval => 0.5
       }
   }
if == "apiAppLog" {
    webhdfs {
      workers => 2
      host => "10.64.8.1"
      port => 50070
      user => "hdfs"
      path => "/data/logstash/api/apiAppLog-%{+YYYY}-%{+MM}-%{+dd}.log"
      flush_size => 500
#      compression => "snappy"
      idle_flush_time => 10
      retry_interval => 0.5
       }
   }
stdout { codec => rubydebug }
}





五:启动logstash

1
#/etc/init.d/logstash start





已经可以成功写入了。




页: [1]
查看完整版本: logstash将Kafka中的日志数据订阅到HDFS