设为首页 收藏本站
查看: 1751|回复: 0

[经验分享] flume搭建调试

[复制链接]

尚未签到

发表于 2015-11-27 20:39:13 | 显示全部楼层 |阅读模式
Installing CDH3
  https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation
  

wget http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo -O /etc/yum.repos.d/cloudera.repo
yum search hadoop
yum -y install hadoop-0.20
yum -y install hadoop-0.20-namenode
yum -y install hadoop-0.20-datanode
#yum -y install hadoop-0.20-secondarynamenode
yum -y install hadoop-0.20-jobtracker
yum -y install hadoop-0.20-tasktracker
  

Installing CDH3 Components


https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation#CDH3Installation-InstallingCDH3Components


yum install


install/Use

---------------------------------

Flume flume

Sqoop sqoop

Hue hue

Pig hadoop-pig

Hive hadoop-hive

HBase hadoop-hbase

ZooKeeper hadoop-zookeeper

Oozie server oozie

Oozie client oozie-client

Whirr whirr

Snappy hadoop-0.20-native

Mahout mahout


flume分为:


flume 核心

flume.node 作为节点的服务自启动脚本

flume.master 作为maaster的服务自启动脚本


yum install flume*


[iyunv@flume-hadoop-node-1 ~]# flume
usage: flume command [args...]
commands include:
dump            Takes a specified source and dumps to console
source          Takes a specified source and dumps to console
node            Start a Flume node/agent (with watchdog)
master          Start a Flume Master server (with watchdog)
version         Dump flume build version information
node_nowatch    Start a flume node/agent (no watchdog)
master_nowatch  Start a Flume Master server (no watchdog)
class <class>   Run specified fully qualified class using Flume environment (no watchdog)
ex: flume com.cloudera.flume.agent.FlumeNode
classpath       Dump the classpath used by the java executables
shell           Start the flume shell
killmaster      Kill a running master
dumplog         Takes a specified WAL/DFO log file and dumps to console
sink            Start a one-shot flume node with console source and specified sink

cd /etc/flume/conf
mv flume-site.xml.template flume-site.xml
vi flume-site.xml
#修改masterhost为你的host
/etc/init.d/flume-master  start
/etc/init.d/flume-node start

flume文档


http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html


flume总的来说,是面向流的设计,“source“和”sink&quot;分别代表产生和消费,push、pull都支持,可以扩展支持各种数据源,及数据的处理,非常灵活。


先停掉服务,以前台模式运行,方便查看各种输出,直观的了解一把


/etc/init.d/flume-master stop && /etc/init.d/flume-node stop

启动flume


flume dump console

启动之后,你可以在输入任何字符,然后会有来自flume的回显,因为我们参数指定了console,这个其实是配置flume的source为console的输入,默认sink也是console


source为文件的情况

flume dump 'text(&quot;/etc/services&quot;)'


tail文件末尾信息的方法

flume dump 'tail(&quot;testfile&quot;)'


testfile可以不存在,没有问题,我们在另外的console里面创建这个文件,并添加些内容


[iyunv@flume-hadoop-node-1 tmp]# echo &quot;test flume&quot;>testfile
[iyunv@flume-hadoop-node-1 tmp]# echo &quot;test flume 123&quot;>testfile
[iyunv@flume-hadoop-node-1 tmp]# echo &quot;test flume 123&quot;>>testfile
[iyunv@flume-hadoop-node-1 tmp]# echo &quot;test flume 1234&quot;>>testfile
[iyunv@flume-hadoop-node-1 tmp]# echo &quot;test flume 12345\r\n123456&quot;>>testfile
  在flume这边,就可以实时的看到反馈
  2012-01-06 20:42:55,818 [main] INFO agent.LogicalNodeManager: Loading node name with FlumeConfigData: {srcVer:'Thu Jan 01 08:00:00 CST 1970' snkVer:'Thu Jan 01 08:00:00 CST 1970'  ts='Thu Jan 01 08:00:00 CST 1970' flowId:'null' source:'tail( &quot;testfile&quot; )' sink:'console' }
2012-01-06 20:42:55,836 [main] INFO agent.LogicalNode: Node config successfully set to FlumeConfigData: {srcVer:'Thu Jan 01 08:00:00 CST 1970' snkVer:'Thu Jan 01 08:00:00 CST 1970'  ts='Thu Jan 01 08:00:00 CST 1970' flowId:'null' source:'tail( &quot;testfile&quot; )' sink:'console' }
2012-01-06 20:42:55,920 [logicalNode dump-10] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
2012-01-06 20:42:55,973 [main] INFO agent.FlumeNode: Hadoop Security enabled: false
flume-hadoop-node-1 [INFO Fri Jan 06 20:43:21 CST 2012] { tailSrcFile : (long)8387236824819002469  (string) 'testfile' (double)4.914663849160389E252 } test flume
flume-hadoop-node-1 [INFO Fri Jan 06 20:43:36 CST 2012] { tailSrcFile : (long)8387236824819002469  (string) 'testfile' (double)4.914663849160389E252 } 123
flume-hadoop-node-1 [INFO Fri Jan 06 20:43:48 CST 2012] { tailSrcFile : (long)8387236824819002469  (string) 'testfile' (double)4.914663849160389E252 } test flume 123
flume-hadoop-node-1 [INFO Fri Jan 06 20:43:56 CST 2012] { tailSrcFile : (long)8387236824819002469  (string) 'testfile' (double)4.914663849160389E252 } test flume 1234
flume-hadoop-node-1 [INFO Fri Jan 06 20:44:11 CST 2012] { tailSrcFile : (long)8387236824819002469  (string) 'testfile' (double)4.914663849160389E252 } test flume 12345\\r\\n123456

多个文件,也是可以的


flume dump 'multitail(&quot;test1&quot;, &quot;test2&quot;)'

默认情况下,tail会处理文件的每一行,并分别生成event,默认分隔符是“\n”,并且不会排除分隔符本身,如果你需要自定义分隔符(采用正则表达式),也是可以的,支持

”prev&quot;:分隔符属于前一个event

&quot;next&quot;:分隔符属于下一个event

&quot;exclude&quot;:分隔符丢弃


tail(&quot;file&quot;, delim=&quot;\n\n&#43;&quot;, delimMode=&quot;exclude&quot;)
tail(&quot;file&quot;, delim=&quot;</a>&quot;, delimMode=&quot;prev&quot;)

开启一个UDP服务,并监听5140端口


flume dump 'syslogUdp(5140)'

flume web console


http://10.129.8.125:35871/flumemaster.jsp


Cloudera Manager Free Edition


https://ccp.cloudera.com/display/express37/Cloudera&#43;Manager&#43;Free&#43;Edition&#43;Documentation


wget http://archive.cloudera.com/cloudera-manager/installer/latest/cloudera-manager-installer.bin
chmod a&#43;x cloudera-manager-installer.bin
./cloudera-manager-installer.bin

安装之前,先禁用Selinux


vi /etc/selinux/config
--
SELINUX=disabled
--

setenforce 0

./cloudera-manager-installer.bin

安装失败,查看日志,发现安装包下载不下来,只能手动下载安装了。


手动安装JDK


wget http://archive.cloudera.com/cloudera-manager/redhat/5/x86_64/cloudera-manager/3/RPMS/jdk-6u21-linux-amd64.rpm
rpm -Uhv jdk-6u21-linux-amd64.rpm

http://archive.cloudera.com/cloudera-manager/redhat/5/x86_64/cloudera-manager/3/RPMS/cloudera-manager-daemons-3.7.2.143-1.noarch.rpm


----------------------------华丽的不行了的分割线-----------------------------------------


2台机器:125 126

125上配置:


vi /etc/flume/conf/flume-site.xml
<property>
<name>flume.collector.event.host</name>
<value>collector</value>
<description>This is the host name of the default &quot;remote&quot;     collector.
</description>
</property>
<property>
<name>flume.collector.port</name>
<value>35853</value>
<description>This default tcp port that the collector listens to     in order to receive events it is collecting.
</description>
</property>

启动flume各节点


flume node_watch -n collector

HDFS服务器设置(新配)




hdfs://10.129.8.126/



cp  /usr/lib/hadoop/example-confs/conf.pseudo/*  /etc/hadoop/conf/

mkdir /var/lib/hadoop-0.20/cache/hadoop/dfs/name -p
chmod 777 -R /var/lib/hadoop-0.20/
sudo -u hdfs hadoop namenode -format  (注意大写的:Y)

[root@cloudera-node-1 logs]# hadoop fs -ls hdfs://127.0.0.1/
ls: Wrong FS: hdfs://127.0.0.1/, expected: hdfs://cloudera-node-1
Usage: java FsShell [-ls <path>]
[root@cloudera-node-1 logs]# hadoop fs -ls hdfs://cloudera-node-1
ls: Pathname  from hdfs://cloudera-node-1 is not a valid DFS filename.
Usage: java FsShell [-ls <path>]
[root@cloudera-node-1 logs]# hadoop fs -ls hdfs://cloudera-node-1/
[root@cloudera-node-1 logs]# hadoop fs -mkdir  hdfs://cloudera-node-1/test
[root@cloudera-node-1 logs]# hadoop fs -ls hdfs://cloudera-node-1/
Found 1 items
drwxr-xr-x   - root supergroup          0 2012-02-03 00:54 /test
[root@cloudera-node-1 logs]#

修改hadoop配置,使用外部ip



vi /etc/hadoop/conf/core-site.xml
<property>
<name>fs.default.name</name>
<value>hdfs://10.129.8.126:8020</value>
</property>
/etc/init.d/hadoop-0.20-namenode restart
[root@cloudera-node-1 logs]# hadoop fs -ls hdfs://10.129.8.126/
Found 1 items
drwxr-xr-x   - root supergroup          0 2012-02-03 00:54 /test

设置访问权限:



hadoop dfs -chmod 777  hdfs://10.129.8.126/flume/
hadoop dfs -chmod 777  hdfs://10.129.8.126/flume/*

126节点,启动flume


flume node_nowatch

打开flume master


http://10.129.8.125:35871/flumemaster.jsp


cloudera-node-1 : text(&quot;/etc/services&quot;) | agentSink(&quot;10.129.8.125&quot;,35853);
collector : collectorSource(35853) | collectorSink(&quot;hdfs://10.129.8.126/flume/&quot;,&quot;srcdata&quot;);

??

Flume’s Tiered Event Sources


collectorSource[(port)]

Collector source. Listens for data from agentSinks forwarding to port port. If port is not specified, the node default collector TCP port, 35853.

!!


hadoop dfs -ls hdfs://10.129.8.126/flume/


125上报错:



org.apache.hadoop.ipc.RemoteException: java.io.IOException: File /flume/srcdata20120203-013616957&#43;0800.2438481505068540.00000021.tmp could only be replicated to 0 nodes, instead of 1
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1520)
at org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:665)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1434)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1430)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1428)
at org.apache.hadoop.ipc.Client.call(Client.java:1107)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:226)
at $Proxy6.addBlock(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at $Proxy6.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:3178)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:3047)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$1900(DFSClient.java:2305)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2500)

vi /etc/hadoop/conf/hdfs-site.xml

设置replica为0.也不行



<delete>
设置 vi /etc/hadoop/conf/hdfs-site.xml
<property>
<name>dfs.thrift.address</name>
<value>10.129.8.126:10090</value>
</property>
</delete>

vi /etc/hadoop/conf/masters

替换localhost为ip:10.129.8.126


还是不行,在125上手动执行upload操作



vi a.txt
hadoop dfs -put a.txt hdfs://10.129.8.126/flume/srcdata20120203-014405668&#43;0800.2438950215947540.00000019.tmp.1

报一样的错误,


在126上执行如上操作,报同样错误,MD


看来是datanode挂了,但是服务显示启动,重启试试。



[root@cloudera-node-1 ~]# /etc/init.d/hadoop-0.20-datanode status
datanode (pid  4866) is running...
[root@cloudera-node-1 ~]# /etc/init.d/hadoop-0.20-datanode restart
Stopping Hadoop datanode daemon (hadoop-datanode): stopping datanode
datanode is stopped                                        [  OK  ]
Starting Hadoop datanode daemon (hadoop-datanode): starting datanode, logging to /usr/lib/hadoop-0.20/logs/hadoop-hadoop-datanode-cloudera-node-1.out
datanode (pid  8570) is running...                         [  OK  ]
[root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
[root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
[root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
[root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
[root@cloudera-node-1 ~]# hadoop dfs -put a.txt hdfs://10.129.8.126/flume/srcdata20120203-014405668&#43;0800.2438950215947540.00000019.tmp.12
put: Target hdfs://10.129.8.126/flume/srcdata20120203-014405668&#43;0800.2438950215947540.00000019.tmp.12 already exists
[root@cloudera-node-1 ~]# hadoop dfs -put a.txt hdfs://10.129.8.126/flume/srcdata20120203-014405668&#43;0800.2438950215947540.00000019.tmp.123
[root@cloudera-node-1 ~]#

ok了。


如果报safemode了


2012-02-03 01:42:17,467 [logicalNode collector-19] INFO rolling.RollSink: closing RollSink 'escapedCustomDfs(&quot;hdfs://10.129.8.126/flume/&quot;,&quot;srcdata%{rolltag}&quot; )'
2012-02-03 01:42:17,467 [logicalNode collector-19] INFO rolling.RollSink: opening RollSink  'escapedCustomDfs(&quot;hdfs://10.129.8.126/flume/&quot;,&quot;srcdata%{rolltag}&quot; )'
2012-02-03 01:42:17,468 [logicalNode collector-19] INFO debug.InsistentOpenDecorator: Opened MaskDecorator on try 0
2012-02-03 01:42:17,469 [pool-7-thread-1] INFO hdfs.EscapedCustomDfsSink: Opening hdfs://10.129.8.126/flume/srcdata20120203-014217467&#43;0800.2438842015436540.00000019
2012-02-03 01:42:17,476 [logicalNode collector-19] INFO debug.InsistentAppendDecorator: append attempt 3 failed, backoff (8000ms): org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot create file/flume/srcdata20120203-014217467&#43;0800.2438842015436540.00000019.tmp. Name node is in safe mode.
The number of live datanodes 0 needs an additional 1 live datanodes to reach the minimum number 1. Safe mode will be turned off automatically.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1182)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1150)
at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:597)
at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:576)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1434)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1430)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1428)

执行

hadoop dfsadmin -safemode leave


ok,再来一遍


125上面;

flume node_nowatch -n collector


126上面:

flume node_nowatch


ok搞定


[root@flume-hadoop-node-1 log]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 2 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup          0 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023.tmp
[root@flume-hadoop-node-1 log]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 2 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup    7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023
[root@flume-hadoop-node-1 log]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 2 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup    7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023
[root@flume-hadoop-node-1 log]# hadoop fs -tail hdfs://10.129.8.126/flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
\t\t3881/udp\t\t\t# Data Acquisition and Control&quot;,&quot;timestamp&quot;:1328205987177,&quot;pri&quot;:&quot;INFO&quot;,&quot;nanos&quot;:100778763829457,&quot;host&quot;:&quot;cloudera-node-1&quot;,&quot;fields&quot;:{&quot;AckTag&quot;:&quot;20120203-020626329&#43;0800.100777916519457.00000019&quot;,&quot;AckType&quot;:&quot;msg&quot;,&quot;AckChecksum&quot;:&quot;\u0000\u0000\u0000\u0000陋赂\u0010娄&quot;,&quot;rolltag&quot;:&quot;20120203-021531413&#43;0800.2440835961446540.00000021&quot;}}
{&quot;body&quot;:&quot;msdts1\t\t3882/tcp\t\t\t# DTS Service Port&quot;,&quot;timestamp&quot;:1328205987177,&quot;pri&quot;:&quot;INFO&quot;,&quot;nanos&quot;:100778763863457,&quot;host&quot;:&quot;cloudera-node-1&quot;,&quot;fields&quot;:{&quot;AckTag&quot;:&quot;20120203-020626329&#43;0800.100777916519457.00000019&quot;,&quot;AckType&quot;:&quot;msg&quot;,&quot;AckChecksum&quot;:&quot;\u0000\u0000\u0000\u0000?隆w?&quot;,&quot;rolltag&quot;:&quot;20120203-021531413&#43;0800.2440835961446540.00000021&quot;}}
{&quot;body&quot;:&quot;msdts1\t\t3882/udp\t\t\t# DTS Service Port&quot;,&quot;timestamp&quot;:1328205987177,&quot;pri&quot;:&quot;INFO&quot;,&quot;nanos&quot;:100778763897457,&quot;host&quot;:&quot;cloudera-node-1&quot;,&quot;fields&quot;:{&quot;AckTag&quot;:&quot;20120203-020626329&#43;0800.100777916519457.00000019&quot;,&quot;AckType&quot;:&quot;msg&quot;,&quot;AckChecksum&quot;:&quot;\u0000\u0000\u0000\u00005=?\u0002&quot;,&quot;rolltag&quot;:&quot;20120203-021531413&#43;0800.2440835961446540.00000021&quot;}}

新加flume node

126上面:


flume node_nowatch -n agentAB


flume-master页面上面添加配置

agentAB : text(&quot;/var/log/dmesg&quot;) | agentSink(&quot;10.129.8.125&quot;,35853);


OK,没有问题,下面试试默认配置


flume node_nowatch -n agentABC

agentABC : text(&quot;/tmp/medcl&quot;) | agentSink(&quot;10.129.8.125&quot;);


这个时候,

node status里面

agentABC agentABC flume-hadoop-node-1 OPENING Fri Feb 03 02:31:11 CST 2012 3 Fri Feb 03 02:32:49 CST 2012


console端报错:


2012-02-03 02:31:14,823 [logicalNode agentABC-22] INFO connector.DirectDriver: Connector logicalNode agentABC-22 exited with error: /tmp/medcl (No such file or directory)
java.io.FileNotFoundException: /tmp/medcl (No such file or directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:212)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:98)
at com.cloudera.flume.handlers.debug.TextFileSource.open(TextFileSource.java:75)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:87)
Exception in thread &quot;logicalNode agentABC-22&quot; java.lang.NullPointerException
at com.cloudera.flume.handlers.debug.TextFileSource.close(TextFileSource.java:69)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.ensureClosed(DirectDriver.java:183)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.errorCleanup(DirectDriver.java:204)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)

创建文件

echo &quot;hello world&quot; > /tmp/medcl


继续失败着,不能自动恢复,只能重启node


[root@cloudera-node-1 log]# hadoop dfs -tail hdfs://10.129.8.126/flume/srcdata20120203-023644240&#43;0800.2442108787815540.00000021
{&quot;body&quot;:&quot;hello world&quot;,&quot;timestamp&quot;:1328207806233,&quot;pri&quot;:&quot;INFO&quot;,&quot;nanos&quot;:2442110780978540,&quot;host&quot;:&quot;flume-hadoop-node-1&quot;,&quot;fields&quot;:{&quot;AckTag&quot;:&quot;20120203-023646196&#43;0800.2442110743929540.00000022&quot;,&quot;AckType&quot;:&quot;msg&quot;,&quot;AckChecksum&quot;:&quot;\u0000\u0000\u0000\u0000\rJ\u0011?&quot;,&quot;rolltag&quot;:&quot;20120203-023644240&#43;0800.2442108787815540.00000021&quot;}}


flume node_nowatch -n agentABCD
agentABCD : text(&quot;/tmp/medcl&quot;) | agentSink(&quot;10.129.8.125&quot;);

text sink只能执行一次,后续文件有变化,并不处理


tail就可以实现监听


flume node_nowatch -n collector #如果collector已经关闭,需要重新打开,配置文件在前面
flume node_nowatch -n agentABCDE
agentABCDE : tail(&quot;/tmp/medcl&quot;) | agentSink(&quot;10.129.8.125&quot;);

collector每30秒写一次hadoop,hadoop文件每次新建一个


[root@flume-hadoop-node-1 tmp]# echo &quot;happy new year&quot;>>medcl
[root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 7 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup    7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023
-rw-r--r--   3 root supergroup     197377 2012-02-03 02:25 /flume/srcdata20120203-022338788&#43;0800.2441323335749540.00000021
-rw-r--r--   3 root supergroup        318 2012-02-03 02:38 /flume/srcdata20120203-023644240&#43;0800.2442108787815540.00000021
-rw-r--r--   3 root supergroup     761621 2012-02-07 19:00 /flume/srcdata20120207-185754757&#43;0800.2846579304755540.00000021
-rw-r--r--   3 root supergroup        336 2012-02-07 19:02 /flume/srcdata20120207-185954947&#43;0800.2846699494856540.00000021
-rw-r--r--   3 root supergroup        329 2012-02-07 19:09 /flume/srcdata20120207-190658071&#43;0800.2847122618653540.00000021
[root@flume-hadoop-node-1 tmp]#
[root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 8 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup    7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023
-rw-r--r--   3 root supergroup     197377 2012-02-03 02:25 /flume/srcdata20120203-022338788&#43;0800.2441323335749540.00000021
-rw-r--r--   3 root supergroup        318 2012-02-03 02:38 /flume/srcdata20120203-023644240&#43;0800.2442108787815540.00000021
-rw-r--r--   3 root supergroup     761621 2012-02-07 19:00 /flume/srcdata20120207-185754757&#43;0800.2846579304755540.00000021
-rw-r--r--   3 root supergroup        336 2012-02-07 19:02 /flume/srcdata20120207-185954947&#43;0800.2846699494856540.00000021
-rw-r--r--   3 root supergroup        329 2012-02-07 19:09 /flume/srcdata20120207-190658071&#43;0800.2847122618653540.00000021
-rw-r--r--   3 root supergroup        337 2012-02-07 19:12 /flume/srcdata20120207-190929343&#43;0800.2847273890577540.00000021
[root@flume-hadoop-node-1 tmp]# hadoop fs -get hdfs://10.129.8.126/flume/srcdata20120207-190929343&#43;0800.2847273890577540.00000021 /tmp/lo2

如果是替换文件内容,不是追加,第一条记录会造成丢失,此处应该特别注意(bug?)


[root@flume-hadoop-node-1 tmp]# echo &quot;who is your daddy?&quot;>medcl
[root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 8 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup    7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023
-rw-r--r--   3 root supergroup     197377 2012-02-03 02:25 /flume/srcdata20120203-022338788&#43;0800.2441323335749540.00000021
-rw-r--r--   3 root supergroup        318 2012-02-03 02:38 /flume/srcdata20120203-023644240&#43;0800.2442108787815540.00000021
-rw-r--r--   3 root supergroup     761621 2012-02-07 19:00 /flume/srcdata20120207-185754757&#43;0800.2846579304755540.00000021
-rw-r--r--   3 root supergroup        336 2012-02-07 19:02 /flume/srcdata20120207-185954947&#43;0800.2846699494856540.00000021
-rw-r--r--   3 root supergroup        329 2012-02-07 19:09 /flume/srcdata20120207-190658071&#43;0800.2847122618653540.00000021
-rw-r--r--   3 root supergroup        337 2012-02-07 19:12 /flume/srcdata20120207-190929343&#43;0800.2847273890577540.00000021

再追加一条数据


[root@flume-hadoop-node-1 tmp]# echo &quot;here is a new line&quot;>>medcl
[root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 9 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup    7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023
-rw-r--r--   3 root supergroup     197377 2012-02-03 02:25 /flume/srcdata20120203-022338788&#43;0800.2441323335749540.00000021
-rw-r--r--   3 root supergroup        318 2012-02-03 02:38 /flume/srcdata20120203-023644240&#43;0800.2442108787815540.00000021
-rw-r--r--   3 root supergroup     761621 2012-02-07 19:00 /flume/srcdata20120207-185754757&#43;0800.2846579304755540.00000021
-rw-r--r--   3 root supergroup        336 2012-02-07 19:02 /flume/srcdata20120207-185954947&#43;0800.2846699494856540.00000021
-rw-r--r--   3 root supergroup        329 2012-02-07 19:09 /flume/srcdata20120207-190658071&#43;0800.2847122618653540.00000021
-rw-r--r--   3 root supergroup        337 2012-02-07 19:12 /flume/srcdata20120207-190929343&#43;0800.2847273890577540.00000021
-rw-r--r--   3 root supergroup          0 2012-02-07 19:19 /flume/srcdata20120207-191702865&#43;0800.2847727413000540.00000021.tmp
[root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
Found 9 items
-rw-r--r--   3 root supergroup   11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413&#43;0800.2440835961446540.00000021
-rw-r--r--   3 root supergroup    7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232&#43;0800.2440869780410540.00000023
-rw-r--r--   3 root supergroup     197377 2012-02-03 02:25 /flume/srcdata20120203-022338788&#43;0800.2441323335749540.00000021
-rw-r--r--   3 root supergroup        318 2012-02-03 02:38 /flume/srcdata20120203-023644240&#43;0800.2442108787815540.00000021
-rw-r--r--   3 root supergroup     761621 2012-02-07 19:00 /flume/srcdata20120207-185754757&#43;0800.2846579304755540.00000021
-rw-r--r--   3 root supergroup        336 2012-02-07 19:02 /flume/srcdata20120207-185954947&#43;0800.2846699494856540.00000021
-rw-r--r--   3 root supergroup        329 2012-02-07 19:09 /flume/srcdata20120207-190658071&#43;0800.2847122618653540.00000021
-rw-r--r--   3 root supergroup        337 2012-02-07 19:12 /flume/srcdata20120207-190929343&#43;0800.2847273890577540.00000021
-rw-r--r--   3 root supergroup        341 2012-02-07 19:19 /flume/srcdata20120207-191702865&#43;0800.2847727413000540.00000021
[root@flume-hadoop-node-1 tmp]# hadoop fs -tail hdfs://10.129.8.126/flume/srcdata20120207-191702865&#43;0800.2847727413000540.00000021
{&quot;body&quot;:&quot;here is a new line&quot;,&quot;timestamp&quot;:1328613446703,&quot;pri&quot;:&quot;INFO&quot;,&quot;nanos&quot;:2847751251273540,&quot;host&quot;:&quot;flume-hadoop-node-1&quot;,&quot;fields&quot;:{&quot;AckTag&quot;:&quot;20120207-191720960&#43;0800.2847745508415540.00000025&quot;,&quot;AckType&quot;:&quot;msg&quot;,&quot;AckChecksum&quot;:&quot;\u0000\u0000\u0000\u0000/rN?&quot;,&quot;tailSrcFile&quot;:&quot;medcl&quot;,&quot;rolltag&quot;:&quot;20120207-191702865&#43;0800.2847727413000540.00000021&quot;}}

果然,数据丢了一条了。


ok,前面提到了flume使用3种工作模式来保证数据的可靠性与可用性:

1.End2End,2端确认,失败会自动重试(重试次数多少,重试失败之后怎样处理,还要继续研究)

agentE2ESink[(&quot;machine&quot;[,port])]


2.DiskFailover,失败写本地磁盘,周期性检查,collector可用的时候,自动重做任务。

agentDFOSink[(&quot;machine&quot;[,port])]


3.高效模式,collector失败就丢弃日志,够狠够绝

agentBESink[(&quot;machine&quot;[,port])]


前面使用到的agentSink,是第一种End2End的别名,效果和End2End一样。


多收集器的配置


多个collector能够提高吞吐量,因为日志收集都是平行,前面提到过,为保证可靠性,如果collector挂了,agent需要写本地磁盘,然后周期性的去重新连接collector,另外,日志收集停止了,后面的日志处理与分析也歇菜了,这个可不行的。

多个collector就可以解决这个问题,汗!


另外多个collector中,如果其中一个挂了,agent应该是能够自动切换的,怎么配呢?


使用failover chains,


agentA : src | agentE2EChain(&quot;collectorA:35853&quot;,&quot;collectorB:35853&quot;);
agentB : src | agentE2EChain(&quot;collectorA:35853&quot;,&quot;collectorC:35853&quot;);
agentC : src | agentE2EChain(&quot;collectorB:35853&quot;,&quot;collectorA:35853&quot;);
agentD : src | agentE2EChain(&quot;collectorB:35853&quot;,&quot;collectorC:35853&quot;);
agentE : src | agentE2EChain(&quot;collectorC:35853&quot;,&quot;collectorA:35853&quot;);
agentF : src | agentE2EChain(&quot;collectorC:35853&quot;,&quot;collectorB:35853&quot;);
collectorA : collectorSource(35853) | collectorSink(&quot;hdfs://...&quot;,&quot;src&quot;);
collectorB : collectorSource(35853) | collectorSink(&quot;hdfs://...&quot;,&quot;src&quot;);
collectorC : collectorSource(35853) | collectorSink(&quot;hdfs://...&quot;,&quot;src&quot;);

如上配置,chain指定了2个,第一个collector失败了之后,自动切换使用第二个。


自动FailoverChain,主要是通过使用特殊的source和sink名字(多master下不适用)


source使用:

autoCollectorSource


sink使用:

autoE2EChain, autoDFOChain, or autoBEChain


配置为:

agentA : src | autoE2EChain ;

agentB : src | autoE2EChain ;

agentC : src | autoE2EChain ;

agentD : src | autoE2EChain ;

agentE : src | autoE2EChain ;

agentF : src | autoE2EChain ;

collectorA : autoCollectorSource | collectorSink(&quot;hdfs://...&quot;, &quot;src&quot;);

collectorB : autoCollectorSource | collectorSink(&quot;hdfs://...&quot;, &quot;src&quot;);

collectorC : autoCollectorSource | collectorSink(&quot;hdfs://...&quot;, &quot;src&quot;);


Logical Configurations

一个physical node包含若干个logical node,logical node又分为:logical sources 和logical sinks ,使用flow来隔离nodes和分组


logical node允许一个JVM实例包含多个logical nodes,实现在一个JVM上跑多个Source和Sink的线程。


每个logical node的名称必须唯一,包括physical node 名称或者 host名称都不能相同


logical定义分两步,


1.定义node类型

agent1 : _source_ | autoBEChain ;

collector1 : autoCollectorSource | collectorSink(&quot;hdfs://....&quot;) ;


2.mapping logical node和 physical node

map host1 agent1

map host2 collector1


3.解除一个logical节点

decommission agent1


试试


125
1004  cd /tmp/
1005  ls
1006  rm -rif flume-*
1007  /etc/init.d/flume-master restart
1008  /etc/init.d/flume-node star

126上


/etc/init.d/flume-node star

flume master页面

config:


agent1 : tail(&quot;/tmp/medcl&quot;) | autoBEChain ;
collector1 : autoCollectorSource | collectorSink(&quot;hdfs://10.129.8.126/flume/&quot;,&quot;medcl&quot;) ;

注:主机名-ip

cloudera-node-1:10.129.8.126

flume-hadoop-node-1:10.129.8.125


raw command:


command: map
arguments:10.129.8.125 agent1
#flume-hadoop-node-1 agent1
command: map
arguments: 10.129.8.126 collector1
#cloudera-node-1 collector1
试试解除
map 10.129.8.125 agent2
decommission  agent2

(注意空&#26684;,decommission两端不能有空&#26684;)


或者unmap和map操作来移动logicalnode


unmap host2 collector1
map host3 collector1



抓包得到请求为:

curl -XPOST http://10.129.8.125:35871/mastersubmit.jsp -d'cmd=unmap&args=10.129.8.125&#43;agent1'



注:logical sources和logical sinks在多master下不适用


通过logical source和logical sink可以在不知道具体物理节点的时候就进行流程的配置,flume有一种翻译的机制,会自动将logical节点名称替换成实际的主机名和端口

事实上,autoSinks和auto-Chain也是这样来实现的。


Flow 隔离,(注,多master下也不适用,悲催啊)


假设你需要收集一个物理机的多种数据,并存放到不同的地方,一种方式是对所有的数据打上tag,通过同一个管道来传数据,然后通过后处理来分离数据


另一种是在整个传输过程中通过将两两种数据隔离,避免后处理的产生


Flume两种都支持,并且延时很低,通过引入flow的概念,将节点进行分组,配置方式如下:

flume master页面:

raw commands


命令:config

参数:[logincal node] [flow name] fooSrc autoBEChain


实际例子:


config AgentC myflow tail(&quot;/tmp/medcl&quot;) autoBEChain
config CollectorC myflow autoCollectorSource collectorSink(&quot;hdfs://10.129.8.126/flume/&quot;,&quot;medcl_flow&quot;)
map 10.129.8.125 AgentC
map 10.129.8.126 CollectorC

!!!!


------------

1.问题:

fail( &quot;logical node not mapped to physical node yet&quot; )


1.使用主机名来做map,node status显示的是什么名称,map的时候就用什么名称

2.先map好logical node,然后再更新config配置


正常工作的配置,


map cloudera-node-1  agent1
map flume-hadoop-node-1 collector1
agent1 : tail(&quot;/tmp/medcl&quot;) | agentSink(&quot;10.129.8.125&quot;,35853);
collector1 : collectorSource(35853) | collectorSink(&quot;hdfs://10.129.8.126/flume/&quot;,&quot;medcl&quot;);

!!!!


多master配置

多master之间自动同步,一个master挂了,其下node会自动转移到其他master上去。


flume master有两种工作模式:standalone和distributed

如何配置呢?


<property>
<name>flume.master.servers</name>
<value>hostA,hostB</value>
</property>

一个Host则是standalone模式,多个host即distributed模式【分布式模式下,每个master的配置文件必须一样】

另外,每个master必须要配置不同的serverid,如下:


MaserA:
<property>
<name>flume.master.serverid</name>
<value>0</value>
</property>
MasterB:
<property>
<name>flume.master.serverid</name>
<value>1</value>
</property>

【数字和前面配置的服务器列表的下标保持一致即可】

分布式环境下,至少需要3台服务器来保证允许一台失败,如果要允许同时两台挂掉,则至少需要5台服务器

,如果master节点存活率不能超过总数的一半,整个flume master 集群就会block住,无法读写配置信息


flume master存放配置信息的地方叫做:configuration store,支持插拔,本身支持两种实现:

基于内存的:MBCS和基于ZooKeeper的:ZBCS

默认ZBCS,flume内置zookeeper,支持配置到现有的zookeeper集群去


<property>
<name>flume.master.store</name>
<value>zookeeper</value>
</property>

【value&#20540;可选:zookeeper或者memory】


ZBCS配置


flume.master.zk.logdir:存储配置文件信息,更新日志,失败信息等

flume.master.zk.server.quorum.port:默认3182,zookeeper server本地监听

flume.master.zk.server.election.port:默认3183,zookeeper server用来寻找其它节点

flume.master.zk.client.port:默认3181,用来与zookeeper server通讯


FlumeMaster的gossip协议支持:


<property>
<name>flume.master.gossip.port</name>
<value>57890</value>
</property>

分布式模型下,flume node的配置也需要调整,从连一个改成连接多个master


<property>
<name>flume.master.servers</name>
<value>masterA,masterB,masterC</value>
</property>

flume node通过定期与master的端口做心跳检测,一旦master 连接失败,自动随机切换到剩下的可以连上的master上去。【master节点通过配置flume.master.heartbeat.port来配置心跳端口】


如果要使用外部的zookeeper,配置如下

conf/flume-site.xml.


<property>
<name>flume.master.zk.use.external</name>
<value>true</value>
</property>
<property>
<name>flume.master.zk.servers</name>
<value>zkServerA:2181,zkServerB:2181,zkServerC:2181</value>
</property>

Flume与数据源集成

Flume强大就在于灵活,支持各种数据源,结构化的,非结构化的,半结构化等等

三种方式:

pushing、polling、embedding(嵌入flume组件到你的应用程序中)


Push Sources:

syslogTcp,syslogUdp:syslog,syslog-ng日志协议

scribe:scribe日志系统的协议


Polling:

tail,mulitail:监视文件内容的追加信息

exec:适合从现有系统抽取数据

poller:收集来着flume node本身的信息


Flume Event的数据模型

6个主要的字段;

Unix timestamp

Nanosecond timestamp 【纳秒级别的时间戳】

Priority

Source host

Body

Metadata table with an arbitrary number of attribute value pairs.


所有的event都有这几个字段,不过body长度可能为0,metadata表可能为空。


priority :TRACE, DEBUG, INFO, WARN, ERROR, or FATAL,这几种

body:raw&#26684;式,默认最大32KB,多余的截掉,通过参数flume.event.max.size.bytes来进行配置


使用event的字段来自定义输出位置

collectorSink(&quot;hdfs://namenode/flume/webdata/%H00/&quot;, &quot;%{host}-&quot;)

%H 为时间timestamp字段里的小时,host为field里面的主机名


快速参考:

[horizontal] %{host}

host

%{nanos}

nanos

%{priority}

priority string

%{body}

body

%%

a % character.

%t

Unix time in millis


时间比较特殊,直接使用,不需要{}

collectorSink(&quot;hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/&quot;, &quot;web-&quot;)


快速参考:

%a


locale’s short weekday name (Mon, Tue, …)


%A


locale’s full weekday name (Monday, Tuesday, …)


%b


locale’s short month name (Jan, Feb,…)


%B


locale’s long month name (January, February,…)


%c


locale’s date and time (Thu Mar 3 23:05:25 2005)


%d


day of month (01)


%D


date; same as %m/%d/%y


%H


hour (00..23)


%I


hour (01..12)


%j


day of year (001..366)


%k


hour ( 0..23)


%l


hour ( 1..12)


%m


month (01..12)


%M


minute (00..59)


%P


locale’s equivalent of am or pm


%s


seconds since 1970-01-01 00:00:00 UTC


%S


second (00..60)


%y


last two digits of year (00..99)


%Y


year (2010)


%z


&#43;hhmm numeric timezone (for example, -0400)


输出文件&#26684;式


两种方式:

一直是在 flume-site.xml里面设置默认&#20540;,另外是由特定的sink来决定


1.flume-site.xml

flume.collector.output.format


&#26684;式快速参考


avro


Avro Native file format. Default currently is uncompressed.


avrodata


Binary encoded data written in the avro binary format.


avrojson


JSON encoded data generated by avro.


default


a debugging format.


json


JSON encoded data.


log4j


a log4j pattern similar to that used by CDH output pattern.


raw


Event body only. This is most similar to copying a file but does not preserve any uniqifying metadata like host/timestamp/nanos.


syslog


a syslog like text output format.


seqfile


the binary hadoop Sequence file format with WritableEventKeys keys, and WritableEvent as values.


2.分别配置


collectorSink( &quot;dfsdir&quot;,&quot;prefix&quot;[, rollmillis[, format]])
text(&quot;file&quot;[,format])
formatDfs(&quot;hdfs://nn/file&quot; [, format])
escapedFormatDfs(&quot;hdfs://nn/file&quot; [, format])

压缩seqfile

formatDfs(&quot;hdfs://nn/dir/file&quot;, seqfile(&quot;bzip2&quot;))


HDFS大量小文件与高延迟的处理

Flume两种策略来处理

1.合并小文件到大的文件

2.使用CombinedFileInputFormat


<property>
<name>flume.collector.dfs.compress.codec</name>
<value>None</value>
<description>Writes formatted data compressed in specified codec to
dfs. Value is None, GzipCodec, DefaultCodec (deflate), BZip2Codec,
or any other Codec Hadoop is aware of </description>
</property>

seqfile和avrodata支持内部的压缩,具体再研究


DataFlow定义语言


Fan out,往所有sinks写:

[ console, collectorSink ]


Fail over,当前失败,转移到下一个,尝试候选sink:


< logicalSink(&quot;collector1&quot;) ? logicalSink(&quot;collector2&quot;) >

配置样例:


agent1 : source | < logicalSink(&quot;collector1&quot;) ? logicalSink(&quot;collector2&quot;) > ;

Roll sink,每隔一段时间,关闭当前实例,创建新的实例,每次会创建新的独立的文件:

roll(millis) sink

配置样例:


roll(1000) [ console, escapedCustomDfs(&quot;hdfs://namenode/flume/file-%{rolltag}&quot;) ]

Sink Decorators,sink装饰器

Fan out和Failover影响messages去哪里,但不修改数据,如果要过滤数据什么的,使用sink decorator


sink decorator可以做很多事情,如可以给数据流添加属性,可以通过写ahead 日志来确保可靠性,或者通过批量、压缩来提供网络吞吐,抽样甚至轻量级的分析


flumenode: source | intervalSampler(10) sink;

flumenode: source | batch(100) sink;

flumenode: source | batch(100) gzip sink;

collector(15000) { escapedCustomDfs(&quot;xxx&quot;,&quot;yyy-%{rolltag}&quot;) }

collector(15000) { [ escapedCustomDfs(&quot;xxx&quot;,&quot;yyy-%{rolltag}&quot;), hbase(&quot;aaa&quot;, &quot;bbb-%{rolltag}&quot;), elasticSearch(&quot;eeee&quot;,&quot;ffff&quot;) ] } 【同时往3个sink里面写数据,可能有些是持久化的,有些是瞬时的,都成功之后,才会确认成功】


node1 : tail(&quot;foo&quot;) | ackedWriteAhead batch(100) gzip lazyOpen stubbornAppend logicalSink(&quot;bar&quot;);【write ahead,批量100,gzip压缩】


Metadata支持正则来进行抽取

支持类&#20284;select语法来筛选


thriftSink and thriftSource


扩展与插件


http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_semantics_of_flume_extensions


附录真是好啊


http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_flume_source_catalog



map cloudera-node-1 agent2
agent2 : syslogTcp(2012) | agentSink(&quot;10.129.8.125&quot;,35853);
flume node_nowatch -n medcl
agent2 : syslogTcp(2012) | agentSink(&quot;10.129.8.125&quot;,35853);

测试syslog信息


1.NC连接
nc 10.129.8.126 2012
2.输入syslog消息(遵照&#26684;式:http://blog.csdn.net/xcj0535/article/details/4158624
<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% It's
time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK #
Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport:
Conveyer1=OK, Conveyer2=OK # %%
<1> medcl is back

syslog的&#26684;式
下面是一个syslog消息:
<30>Oct 9 22:33:20 hlfedora auditd[1787]: The audit daemon is exiting.
其中“<30>”是PRI部分,“Oct 9 22:33:20 hlfedora”是HEADER部分,“auditd[1787]: The audit daemon is exiting.”是MSG部分。

[iyunv@cloudera-node-1 ~]# hadoop fs -cat /flume/medcl20120209-221925655&#43;0800.3031470203471540.00000026
{&quot;body&quot;:&quot;medcl is back&quot;,&quot;timestamp&quot;:1328797314800,&quot;pri&quot;:&quot;INFO&quot;,&quot;nanos&quot;:692106386851457,&quot;host&quot;:&quot;cloudera-node-1&quot;,&quot;fields&quot;:{&quot;AckTag&quot;:&quot;20120209-222148285&#43;0800.692099872659457.00000037&quot;,&quot;syslogfacility&quot;:&quot;\u0001&quot;,&quot;AckType&quot;:&quot;msg&quot;,&quot;AckChecksum&quot;:&quot;\u0000\u0000\u0000\u0000qu锚茫&quot;,&quot;syslogseverity&quot;:&quot;\u0003&quot;,&quot;rolltag&quot;:&quot;20120209-221925655&#43;0800.3031470203471540.00000026&quot;}}

upload到HDFS的文件包含了太多内容
raw下
collector2 : syslogTcp( 2013) | collectorSink( &quot;hdfs://10.129.8.126/flume/&quot;, &quot;medcl_raw&quot;,3000,raw  );

C:\Windows\system32>nc 10.129.8.125 2013
<1> i will be back
<1> i will be back2
<1> i will be back3
<1> i will be back4
[iyunv@cloudera-node-1 ~]# hadoop fs -cat /flume/medcl_raw20120209-235000888&#43;0800.3036905435701540.00000069
i will be back
i will be back2
i will be back3
i will be back4


.NET Agent 25个线程,结果压趴下了[另外后续测试发现经常无原因socket断开,服务端socket直接挂掉,flume显示error]。

2012-02-10 21:29:44,154 ERROR com.cloudera.flume.core.connector.DirectDriver: Exiting driver logicalNode collector2-20 in error state SyslogTcpSourceThreads | Collector because null


syslogTcp不稳定,果断换thriftRpc作为Source,经测果然很稳定



thrift-0.6.0.exe -r -gen csharp flume.thrift
2012-02-13 23:36:30,574 [pool-4-thread-1] ERROR server.TSaneThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin, old client?
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:213)
at com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer$Processor.process(ThriftFlumeEventServer.java:224)
at org.apache.thrift.server.TSaneThreadPoolServer$WorkerProcess.run(TSaneThreadPoolServer.java:280)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)

此异常可能是因为服务端和客户端使用了不相同的transport,如framed和buffered不匹配



collector3 : thriftSource( 2014 )| collectorSink( &quot;hdfs://10.129.8.126/flume/&quot;, &quot;medcl_thrift&quot;,60000,raw  );
collector4 : thriftSource( 2015 )| collectorSink( &quot;hdfs://10.129.8.126/flume/&quot;, &quot;medcl_thrift&quot;,60000,raw  );
collector5 : thriftSource( 2016 )| collectorSink( &quot;hdfs://10.129.8.126/flume/&quot;, &quot;medcl_thrift&quot;,60000,raw  );
collector6 : thriftSource( 2017 )| collectorSink( &quot;hdfs://10.129.8.126/flume/&quot;, &quot;medcl_thrift&quot;,60000,raw  );
collector7 : thriftSource( 2018 )| collectorSink( &quot;hdfs://10.129.8.126/flume/&quot;, &quot;medcl_thrift2&quot;,30000);
map cloudera-node-1 collector7

vi flume-site.xml,添加压缩和默认roll时间


<property>
<name>flume.collector.dfs.compress.gzip</name>
<value>true</value>
<description>Writes compressed output in gzip format to dfs. value is
boolean type, i.e. true/false</description>
</property>
<property>
<name>flume.collector.roll.millis</name>
<value>60000</value>
<description>The time (in milliseconds)
between when hdfs files are closed and a new file is opened
(rolled).
</description>
</property>

测试文件模板


collector8 : thriftSource( 2019 )| collectorSink(&quot;hdfs://10.129.8.126/flume/app/%{host}/%Y-%m-%d/&quot;, &quot;%H%M%S-test1-%t&quot;,5000);
map cloudera-node-1 collector8
[root@flume-hadoop-node-1 ~]# hadoop fs -lsr hdfs://10.129.8.126/flume/app
drwxr-xr-x   - flume supergroup          0 2012-02-17 00:49 /flume/app/MEDCL-THINK
drwxr-xr-x   - flume supergroup          0 2012-02-17 00:49 /flume/app/MEDCL-THINK/4113221-02-12
-rw-r--r--   1 flume supergroup        219 2012-02-17 00:49 /flume/app/MEDCL-THINK/4113221-02-12/203942-test1-12973855419598268720120217-004946767&#43;0800.1305778353827457.00006891

更新



collector8 : thriftSource( 2019 )| collectorSink(&quot;hdfs://10.129.8.126/flume/%{catalog}/2012-%m/%d/&quot;, &quot;%a-%{host}-&quot;,5000,raw());

结果:


/flume/FileTemplateRaw/2012-11/19/Fri-MEDCL-THINK-20120217-013005302&#43;0800.1308196889416457.00007109

collector8 : thriftSource( 2019 )| collectorSink(&quot;hdfs://10.129.8.126/flume/%{catalog}/2012&quot;, &quot;&quot;,5000,raw());

本文来自: flume搭建调试

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144376-1-1.html 上篇帖子: flume 1.4的介绍及使用示例 下篇帖子: 日志系统搭建一(flume+hadoop+hive)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表