设为首页 收藏本站
查看: 1453|回复: 0

[经验分享] flume-ng-taildirectory-source 修改调试可用

[复制链接]

尚未签到

发表于 2015-9-17 03:36:26 | 显示全部楼层 |阅读模式
  由于flume-ng至1.5版本仍旧没有稳定可用的类似flume-og中的taildir的功能,所以从git中https://github.com/jinoos/flume-ng-extends找了一个别人针对flume-ng实现的的taildir这个按照github上他自己说明,是没法正常使用的。查看了源码后,做了一些相应修改
  1. 默认的DirectoryTailParserModulable类修改
  他实现了2种DirectoryTailParserModulable
  第一种是SingleLineParserModule,适用日志里只有单条记录的。并且代码中默认就是使用的这个,显然很不靠谱。提供了配置项,但是说明里没有写出来,配置项为 ‘parser’.
     第二种是MultiLineParserModule,适用多行的日志文件的。这里我们大部分情况肯定是要用这个的。
  DirectoryTailSource类中如下行



  private static final String DEFAULT_PARSER_MODULE_CLASS = "com.jinoos.flume.SingleLineParserModule";
  修改为。包名根据实际情况来更改



  private static final String DEFAULT_PARSER_MODULE_CLASS = "org.apache.flume.source.taildirectory.MultiLineParserModule";
  2. first-line-pattern配置
  这是MultiLineParserModule中的一个属性,用来验证读进来的行是否为第一行。这个说明中也没提到
  如果没有配置这个配置,那么就无法正常执行,会报“wrong log format”。
  主要代码如下:



1         private void readMessage(FileSet fileSet) {
2             try {
3                 String buffer;
4
5                 synchronized (fileSet) {
6
7                     while ((buffer = fileSet.readLine()) != null) {
8                         if (buffer.length() == 0) {
9                             continue;
10                         }
11
12                         boolean isFirstLine = parserModule.isFirstLine(buffer);
13                         if (isFirstLine) {
14                             sendEvent(fileSet);
15                             fileSet.appendLine(buffer);
16                             parserModule.parse(buffer, fileSet);
17
18                         } else {
19                             if (fileSet.getLineSize() == 0) {
20                                 logger.debug("Wrong log format, " + buffer);
21                                 continue;
22                             } else {
23                                 fileSet.appendLine(buffer);
24                                 parserModule.parse(buffer, fileSet);
25                             }
26                         }
27
28                         if (parserModule.isLastLine(buffer)) {
29                             sendEvent(fileSet);
30                         }
31                     }
32                 }
33             } catch (IOException e) {
34                 logger.warn(e.getMessage(), e);
35             }
36         }
  根据我们的实际需求,我们不需要判断是否第一行,只要有change事件,全部写入到channel中即可
  修改为如下方式



1         // 파일을 읽고 Event를 생성한다.
2         private void readMessage(FileSet fileSet) {
3             try {
4                 String buffer;
5
6                 synchronized (fileSet) {
7
8                     while ((buffer = fileSet.readLine()) != null) {
9                         if (buffer.length() == 0) {
10                             continue;
11                         }
12
13                         fileSet.appendLine(buffer);
14                         sendEvent(fileSet);
15                     }
16                 }
17             } catch (IOException e) {
18                 logger.warn(e.getMessage(), e);
19             }
20         }
  改为这种方式后,只要来一行就会send到channel中。如果需要批量的,可以按自己要求更改。
  现在就不再需要关注first-line-pattern这个配置了。
  注意:但是配置在配置文件中还是配的,虽然它没有起到任何作用。如果想不配置,请修改MultiLineParserModule的configure(Context context)方法
  
  3.监控文件中有中文,编码的配置添加
  目前这个版本是无法支持中文的文件的。
  正式读取数据的方法:位置FileSet类中



  public String readLine() throws IOException {
return rReader.readLine();
}
  这个rReader是个RandomAccessFile对象



  public FileSet(AbstractSource source, FileObject fileObject)
throws IOException {
this.source = source;
this.fileObject = fileObject;
this.bufferList = new ArrayList<String>();
//File f = new File(fileObject.getName().getPath());
File f = new File("d:/tmp/log_compare/test1.txt");
rReader = new RandomAccessFile(f, "r");
rReader.seek(f.length());
bufferList = new ArrayList<String>();
headers = new HashMap<String, String>();
logger.debug("FileSet has been created " + fileObject.getName().getPath());
this.seq = 0L;
}
  在FileSet类实例化时创建。
  下面开始修改操作,源代码中是直接使用了RandomAccessFile的readline()方法,修改为按byte读取的方式



  /**
*
* @Title: readLine
* @Description: TODO(读取文件中的一行)
* @param @throws IOException    设定文件
* @return String    返回类型
* @throws
*/
public String readLine() throws IOException {
if(rReader.getFilePointer() < rReader.length()) {
byte b = rReader.readByte();//读取一个byte
int i = 0;
byte[] buf = new byte[10240];//创建大小为1M的数据,如果你的单行超过1M,那么会出错
//如果读到换行符,或者读到文件最后就停止。表示已经读完一行
while(b != '\n' && rReader.getFilePointer() < rReader.length()) {
buf[i++] = b;
b = rReader.readByte();
}
return new String(buf,0,i);
}else{
return "";
}
}
  改完后重新打包再次测试,发现已经可以支持中文了。
  4.每次新文件刚被创建时会丢失第一条数据
  代码如下



        public void run() {
while (true) {
try {
// DirectoryTailEvent event = eventQueue.poll(
// eventQueueWorkerTimeoutMiliSecond,
// TimeUnit.MILLISECONDS);
DirectoryTailEvent event = eventQueue.take();
if (event == null) {
continue;
}
if (event.type == FileEventType.FILE_CHANGED) {
fileChanged(event.event);
                } else if (event.type == FileEventType.FILE_CREATED) {
fileCreated(event.event);
} else if (event.type == FileEventType.FILE_DELETED) {
fileDeleted(event.event);
} else if (event.type == FileEventType.FLUSH) {
if (event.fileSet != null)
sendEvent(event.fileSet);
}
} catch (InterruptedException e) {
logger.debug(e.getMessage(), e);
} catch (FileSystemException e) {
logger.info(e.getMessage(), e);
}
}
}
  上面这段代码为监测的文件夹有新的事件时的处理。这里我们要看的是FILE_CREATE事件,他调用了fileCreated(event.event);



1     private void fileCreated(FileChangeEvent event)
2                 throws FileSystemException {
3             String path = event.getFile().getName().getPath();
4             String dirPath = event.getFile().getParent().getName().getPath();
5
6             logger.debug(path + " has been created.");
7
8             DirPattern dirPattern = null;
9             dirPattern = pathMap.get(dirPath);
10
11             if (dirPattern == null) {
12                 logger.warn("Occurred create event from un-indexed directory. "
13                         + dirPath);
14                 return;
15             }
16
17             // 파일명이 대상인지 검사한다.
18             if (!isInFilePattern(event.getFile(), dirPattern.getFilePattern())) {
19                 logger.debug(path + " is not in file pattern.");
20                 return;
21             }
22
23             FileSet fileSet;
24
25             fileSet = fileSetMap.get(event.getFile().getName().getPath());
26             //fileSet = fileSetMap.get(path);
27             if (fileSet == null) {
28                 try {
29                     logger.info(path
30                             + " is not in monitoring list. It's going to be listed.");
31                     
32                     fileSet = new FileSet(source, event.getFile());
33                     // a little synchronized bug here.fixed by tqli,2014-08-07
34                     // ,E-mail:tiangang1126@126.com
35                     synchronized (fileSetMap) {
36                         fileSetMap.put(path, fileSet);
37                     }
38                 } catch (IOException e) {
39                     logger.error(e.getMessage(), e);
40                     return;
41                 }
42             }
43         }
  看第27行,当新的文件进来,需要创建一个fileSet对象。将这个fileSet对象存入fileSetMap中
  看fileSet实例化的方法,上面已经贴过了



1   public FileSet(AbstractSource source, FileObject fileObject)
2       throws IOException {
3     this.source = source;
4     this.fileObject = fileObject;
5
6     this.bufferList = new ArrayList<String>();
7
8     File f = new File(fileObject.getName().getPath());
9     //File f = new File("d:/tmp/log_compare/test1.txt");
10     rReader = new RandomAccessFile(f, "r");
11     rReader.seek(f.length());
12     bufferList = new ArrayList<String>();
13     headers = new HashMap<String, String>();
14     logger.debug("FileSet has been created " + fileObject.getName().getPath());
15     logger.debug("file length now is : " + f.length());
16     this.seq = 0L;
17   }
  注意看第11行,将游标移到到f.length的位置,这样的问题就是跟着文件新建时写入的内容,全部被忽略了。这样就造成了数据丢失
  那怎么解决这个问题呢,简单的改为



rReader.seek(0);
肯定是不行的,具体的原因,大家自己思考下吧。
我们目的的就是在有监控新的事件时,创建的fileSet,游标位置能在文件原来的的位置。
需求明确了,下面就知道该做哪些事了。
1 首先在DirectoryTailSource中start方法执行时,将配置监控文件下符合正则条件文件的length都保存在一个Map里
2 在监听到新事件新建fileSet时,判断这个文件是新建的还是之前就存在的,如果是之前就存在的,那么就可以直接取之前记下的这个文件的大小。如果不存在,说明这个文件是个新文件,则从0位置开始读
注意:这个不支持文件更改的情况,只能适应只对文件做增加的场景
下面是代码修改的部分
DirectoryTailSource类

添加 fileInitLengthMap 属性


1     private Map<String, DirPattern> dirMap;
2     private Map<String, DirPattern> pathMap;
3     private Map<String,Long> fileInitLengthMap;//文件初始大小记录,用来定位新建fileSet时的游标初始位置
在configure方法中实例化fileInitLengthMap


    public void configure(Context context) {
logger.info("Source Configuring..");
dirMap = new HashMap<String, DirPattern>();
pathMap = new HashMap<String, DirPattern>();
fileInitLengthMap = new HashMap<String,Long>();
  在start方法中初始化fileInitLengthMap。保存全部符合正则条件的文件大小。红色部分为添加的代码



1     public void start() {
2         logger.info("Source Starting..");
3
4         if (sourceCounter == null) {
5             sourceCounter = new SourceCounter(getName());
6         }
7
8         fileSetMap = new Hashtable<String, FileSet>();
9
10         try {
11             fsManager = VFS.getManager();
12         } catch (FileSystemException e) {
13             logger.error(e.getMessage(), e);
14             return;
15         }
16
17         monitorRunnable = new MonitorRunnable();
18
19         fileMonitor = new DefaultFileMonitor(monitorRunnable);
20         fileMonitor.setRecursive(false);
21
22         FileObject fileObject;
23
24         logger.debug("Dirlist count " + dirMap.size());
25         for (Entry<String, DirPattern> entry : dirMap.entrySet()) {
26             logger.debug("Scan dir " + entry.getKey());
27
28             DirPattern dirPattern = entry.getValue();
29
30             try {
31                 fileObject = fsManager.resolveFile(dirPattern.getPath());
32             } catch (FileSystemException e) {
33                 logger.error(e.getMessage(), e);
34                 continue;
35             }
36
37             try {
38                 if (!fileObject.isReadable()) {
39                     logger.warn("No have readable permission, "
40                             + fileObject.getURL());
41                     continue;
42                 }
43
44                 if (FileType.FOLDER != fileObject.getType()) {
45                     logger.warn("Not a directory, " + fileObject.getURL());
46                     continue;
47                 }
48
49                 // 폴더를 Monitoring 대상에 추가한다.
50                 fileMonitor.addFile(fileObject);
51                 logger.debug(fileObject.getName().getPath()
52                         + " directory has been add in monitoring list");
53                 pathMap.put(fileObject.getName().getPath(), entry.getValue());
54                 //pathMap.put("d:/tmp/log_compare", entry.getValue());
55                 //新增部分,文件初始化大小保存
56                 FileObject[] allChiledfile = fileObject.getChildren();
57                 for(FileObject chiledFileobject : allChiledfile) {
58                     if(dirPattern.getFilePattern().matcher(chiledFileobject.getName().getBaseName()).find()) {
59                         String chiledFildPath = chiledFileobject.getName().getPath();
60                         //String chiledFildPath = "d:/tmp/log_compare/test1.txt";
61                         File chiledfile = new File(chiledFildPath);
62                         fileInitLengthMap.put(chiledFildPath,
63                                 chiledfile.length());
64                         logger.debug(chiledFildPath + " init length is :" + chiledfile.length());
65                     }
66                 }
67             } catch (FileSystemException e) {
68                 logger.warn(e.getMessage(), e);
69                 continue;
70             } catch (Exception e) {
71                 logger.debug(e.getMessage(), e);
72             }
73
74         }
75
76         executorService = Executors
77                 .newFixedThreadPool(eventQueueWorkerSize + 1);
78         monitorFuture = executorService.submit(monitorRunnable);
79
80         for (int i = 0; i < eventQueueWorkerSize; i++) {
81             workerFuture = executorService.submit(new WorkerRunnable(this));
82         }
83
84         sourceCounter.start();
85         super.start();
86     }
  FileSet类
  



1   public FileSet(AbstractSource source, FileObject fileObject,Map<String,Long> fileInitLengthMap)
2       throws IOException {
3     this.source = source;
4     this.fileObject = fileObject;
5
6     this.bufferList = new ArrayList<String>();
7
8     File f = new File(fileObject.getName().getPath());
9     rReader = new RandomAccessFile(f, "r");
10     /*
11      *判断在初始化taildirSource时,这个文件是否存在,如果存在则游标定位当时记录下的文件长度开始
12      *如果不存在,则说明这是一个新建的文件,游标从0开始
13      */
14     if(fileInitLengthMap.containsKey(fileObject.getName().getPath())) {
15         rReader.seek(fileInitLengthMap.get(fileObject.getName().getPath()));
16     }else{
17         rReader.seek(0);
18     }
19     
20     bufferList = new ArrayList<String>();
21     headers = new HashMap<String, String>();
22     logger.debug("FileSet has been created " + fileObject.getName().getPath());
23     logger.debug("file length now is : " + f.length());
24     this.seq = 0L;
25   }
  
  修改类实例化的方法。并修改DirectoryTailSource类中调用FileSet实例化方法的地方。
  至此修改全部全部完成。
  
  没找到能上传附件的地方,改完的jar包就不提供了。
  此为一个使用这个jar的例子



a.sources = sources
a.sinks = sinks
a.channels = c
#configure sources
a.sources.sources.type = org.apache.flume.source.taildirectory.DirectoryTailSource
a.sources.sources.dirs = s0
#a.sources.sources.dirs.s0.path = /usr/local/nginx/logs/
a.sources.sources.dirs.s0.path = /home/flume/testTailDir
a.sources.sources.dirs.s0.file-pattern = ^access_.*log$
a.sources.sources.first-line-pattern = ^(.*)$
#congfigure sinks
a.sinks.sinks.type = file_roll
a.sinks.sinks.sink.directory = /home/flume/testTailDir2
a.sinks.sinks.sink.rollInterval = 30
a.sinks.sinks.channel = c
#configure channals
a.channels.c.type = memory
#bind channel
a.sources.sources.channels = c
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114547-1-1.html 上篇帖子: apache-flume-1.5.0-bin windows 下篇帖子: Flume实例一学习
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表