设为首页 收藏本站
查看: 2758|回复: 0

[经验分享] flume学习(七)、(八):如何使用event header中的key值以及自定义source

[复制链接]

尚未签到

发表于 2015-11-28 15:56:11 | 显示全部楼层 |阅读模式
1.如何使用event header中的key值?
2.如何部署扩展自定义的spooling directory source?


DSC0000.gif




前面我们已经说到我们在header中添加了一个key为:flume.client.log4j.logger.source  ,然后有两个应用程序,一个设置为app1,一个设置为app2。

现在有这么一个需求,要将app1的日志输出到hdfs://master68:8020/flume/events/app1目录下面,app2的日志输出到hdfs://master68:8020/flume/events/app2目录下面,未来也可能有更多的应用程序的日志输出,也即每个程序的日志输出到各自自己的目录下面。

有了前面的头信息之后,我们可以非常简单的实现这个需求:

tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{flume.client.log4j.logger.source}

只需要改一下这一行,用%{flume.client.log4j.logger.source},来替换具体的app日志目录即可。

按照以往的惯例,还是需求驱动学习,有位网友在我的flume学习五中留言提了一个问题如下:
我想实现一个功能,就在读一个文件的时候,将文件的名字和文件生成的日期作为event的header传到hdfs上时,不同的event存到不同的目录下,如一个文件是a.log.2014-07-25在hdfs上是存到/a/2014-07-25目录下,a.log.2014-07-26存到/a/2014-07-26目录下,就是每个文件对应自己的目录,这个要怎么实现。

带着这个问题,我又重新翻看了官方的文档,发现一个spooling directory source跟这个需求稍微有点吻合:它监视指定的文件夹下面有没有写入新的文件,有的话,就会把该文件内容传递给sink,然后将该文件后缀标示为.complete,表示已处理。提供了参数可以将文件名和文件全路径名添加到event的header中去。

现有的功能不能满足我们的需求,但是至少提供了一个方向:它能将文件名放入header!
当时就在祈祷源代码不要太复杂,这样我们在这个地方稍微修改修改,把文件名拆分一下,然后再放入header,这样就完成了我们想要的功能了。

于是就打开了源代码,果然不复杂,代码结构非常清晰,按照我的思路,稍微改了一下,就实现了这个功能,主要修改了与spooling directory source代码相关的三个类,分别是:ReliableSpoolingFileEventExtReader,SpoolDirectorySourceConfigurationExtConstants,SpoolDirectoryExtSource(在原类名的基础上增加了:Ext)代码如下:

  • /*
  • * Licensed to the Apache Software Foundation (ASF) under one
  • * or more contributor license agreements.  See the NOTICE file
  • * distributed with this work for additional information
  • * regarding copyright ownership.  The ASF licenses this file
  • * to you under the Apache License, Version 2.0 (the
  • * "License"); you may not use this file except in compliance
  • * with the License.  You may obtain a copy of the License at
  • *
  • * http://www.apache.org/licenses/LICENSE-2.0
  • *
  • * Unless required by applicable law or agreed to in writing,
  • * software distributed under the License is distributed on an
  • * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  • * KIND, either express or implied.  See the License for the
  • * specific language governing permissions and limitations
  • * under the License.
  • */

  • package com.besttone.flume;

  • import java.io.File;
  • import java.io.FileFilter;
  • import java.io.FileNotFoundException;
  • import java.io.IOException;
  • import java.nio.charset.Charset;
  • import java.util.Arrays;
  • import java.util.Collections;
  • import java.util.Comparator;
  • import java.util.List;
  • import java.util.regex.Matcher;
  • import java.util.regex.Pattern;

  • import org.apache.flume.Context;
  • import org.apache.flume.Event;
  • import org.apache.flume.FlumeException;
  • import org.apache.flume.annotations.InterfaceAudience;
  • import org.apache.flume.annotations.InterfaceStability;
  • import org.apache.flume.client.avro.ReliableEventReader;
  • import org.apache.flume.serialization.DecodeErrorPolicy;
  • import org.apache.flume.serialization.DurablePositionTracker;
  • import org.apache.flume.serialization.EventDeserializer;
  • import org.apache.flume.serialization.EventDeserializerFactory;
  • import org.apache.flume.serialization.PositionTracker;
  • import org.apache.flume.serialization.ResettableFileInputStream;
  • import org.apache.flume.serialization.ResettableInputStream;
  • import org.apache.flume.tools.PlatformDetect;
  • import org.joda.time.DateTime;
  • import org.joda.time.format.DateTimeFormat;
  • import org.joda.time.format.DateTimeFormatter;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;

  • import com.google.common.base.Charsets;
  • import com.google.common.base.Optional;
  • import com.google.common.base.Preconditions;
  • import com.google.common.io.Files;

  • /**
  • * <p/>
  • * A {@link ReliableEventReader} which reads log data from files stored in a
  • * spooling directory and renames each file once all of its data has been read
  • * (through {@link EventDeserializer#readEvent()} calls). The user must
  • * {@link #commit()} each read, to indicate that the lines have been fully
  • * processed.
  • * <p/>
  • * Read calls will return no data if there are no files left to read. This
  • * class, in general, is not thread safe.
  • *
  • * <p/>
  • * This reader assumes that files with unique file names are left in the
  • * spooling directory and not modified once they are placed there. Any user
  • * behavior which violates these assumptions, when detected, will result in a
  • * FlumeException being thrown.
  • *
  • * <p/>
  • * This class makes the following guarantees, if above assumptions are met:
  • * <ul>
  • * <li>Once a log file has been renamed with the {@link #completedSuffix}, all
  • * of its records have been read through the
  • * {@link EventDeserializer#readEvent()} function and {@link #commit()}ed at
  • * least once.
  • * <li>All files in the spooling directory will eventually be opened and
  • * delivered to a {@link #readEvents(int)} caller.
  • * </ul>
  • */
  • @InterfaceAudience.Private
  • @InterfaceStability.Evolving
  • public class ReliableSpoolingFileEventExtReader implements ReliableEventReader {

  •         private static final Logger logger = LoggerFactory
  •                         .getLogger(ReliableSpoolingFileEventExtReader.class);

  •         static final String metaFileName = &quot;.flumespool-main.meta&quot;;

  •         private final File spoolDirectory;
  •         private final String completedSuffix;
  •         private final String deserializerType;
  •         private final Context deserializerContext;
  •         private final Pattern ignorePattern;
  •         private final File metaFile;
  •         private final boolean annotateFileName;
  •         private final boolean annotateBaseName;
  •         private final String fileNameHeader;
  •         private final String baseNameHeader;

  •         // 添加参数开始
  •         private final boolean annotateFileNameExtractor;
  •         private final String fileNameExtractorHeader;
  •         private final Pattern fileNameExtractorPattern;
  •         private final boolean convertToTimestamp;
  •         private final String dateTimeFormat;

  •         private final boolean splitFileName;
  •         private final String splitBy;
  •         private final String splitBaseNameHeader;
  •         // 添加参数结束

  •         private final String deletePolicy;
  •         private final Charset inputCharset;
  •         private final DecodeErrorPolicy decodeErrorPolicy;

  •         private Optional<FileInfo> currentFile = Optional.absent();
  •         /** Always contains the last file from which lines have been read. **/
  •         private Optional<FileInfo> lastFileRead = Optional.absent();
  •         private boolean committed = true;

  •         /**
  •          * Create a ReliableSpoolingFileEventReader to watch the given directory.
  •          */
  •         private ReliableSpoolingFileEventExtReader(File spoolDirectory,
  •                         String completedSuffix, String ignorePattern,
  •                         String trackerDirPath, boolean annotateFileName,
  •                         String fileNameHeader, boolean annotateBaseName,
  •                         String baseNameHeader, String deserializerType,
  •                         Context deserializerContext, String deletePolicy,
  •                         String inputCharset, DecodeErrorPolicy decodeErrorPolicy,
  •                         boolean annotateFileNameExtractor, String fileNameExtractorHeader,
  •                         String fileNameExtractorPattern, boolean convertToTimestamp,
  •                         String dateTimeFormat, boolean splitFileName, String splitBy,
  •                         String splitBaseNameHeader) throws IOException {

  •                 // Sanity checks
  •                 Preconditions.checkNotNull(spoolDirectory);
  •                 Preconditions.checkNotNull(completedSuffix);
  •                 Preconditions.checkNotNull(ignorePattern);
  •                 Preconditions.checkNotNull(trackerDirPath);
  •                 Preconditions.checkNotNull(deserializerType);
  •                 Preconditions.checkNotNull(deserializerContext);
  •                 Preconditions.checkNotNull(deletePolicy);
  •                 Preconditions.checkNotNull(inputCharset);

  •                 // validate delete policy
  •                 if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())
  •                                 && !deletePolicy
  •                                                 .equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
  •                         throw new IllegalArgumentException(&quot;Delete policies other than &quot;
  •                                         &#43; &quot;NEVER and IMMEDIATE are not yet supported&quot;);
  •                 }

  •                 if (logger.isDebugEnabled()) {
  •                         logger.debug(&quot;Initializing {} with directory={}, metaDir={}, &quot;
  •                                         &#43; &quot;deserializer={}&quot;, new Object[] {
  •                                         ReliableSpoolingFileEventExtReader.class.getSimpleName(),
  •                                         spoolDirectory, trackerDirPath, deserializerType });
  •                 }

  •                 // Verify directory exists and is readable/writable
  •                 Preconditions
  •                                 .checkState(
  •                                                 spoolDirectory.exists(),
  •                                                 &quot;Directory does not exist: &quot;
  •                                                                 &#43; spoolDirectory.getAbsolutePath());
  •                 Preconditions.checkState(spoolDirectory.isDirectory(),
  •                                 &quot;Path is not a directory: &quot; &#43; spoolDirectory.getAbsolutePath());

  •                 // Do a canary test to make sure we have access to spooling directory
  •                 try {
  •                         File canary = File.createTempFile(&quot;flume-spooldir-perm-check-&quot;,
  •                                         &quot;.canary&quot;, spoolDirectory);
  •                         Files.write(&quot;testing flume file permissions\n&quot;, canary,
  •                                         Charsets.UTF_8);
  •                         List<String> lines = Files.readLines(canary, Charsets.UTF_8);
  •                         Preconditions.checkState(!lines.isEmpty(), &quot;Empty canary file %s&quot;,
  •                                         canary);
  •                         if (!canary.delete()) {
  •                                 throw new IOException(&quot;Unable to delete canary file &quot; &#43; canary);
  •                         }
  •                         logger.debug(&quot;Successfully created and deleted canary file: {}&quot;,
  •                                         canary);
  •                 } catch (IOException e) {
  •                         throw new FlumeException(&quot;Unable to read and modify files&quot;
  •                                         &#43; &quot; in the spooling directory: &quot; &#43; spoolDirectory, e);
  •                 }

  •                 this.spoolDirectory = spoolDirectory;
  •                 this.completedSuffix = completedSuffix;
  •                 this.deserializerType = deserializerType;
  •                 this.deserializerContext = deserializerContext;
  •                 this.annotateFileName = annotateFileName;
  •                 this.fileNameHeader = fileNameHeader;
  •                 this.annotateBaseName = annotateBaseName;
  •                 this.baseNameHeader = baseNameHeader;
  •                 this.ignorePattern = Pattern.compile(ignorePattern);
  •                 this.deletePolicy = deletePolicy;
  •                 this.inputCharset = Charset.forName(inputCharset);
  •                 this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);

  •                 // 增加代码开始
  •                 this.annotateFileNameExtractor = annotateFileNameExtractor;
  •                 this.fileNameExtractorHeader = fileNameExtractorHeader;
  •                 this.fileNameExtractorPattern = Pattern
  •                                 .compile(fileNameExtractorPattern);
  •                 this.convertToTimestamp = convertToTimestamp;
  •                 this.dateTimeFormat = dateTimeFormat;

  •                 this.splitFileName = splitFileName;
  •                 this.splitBy = splitBy;
  •                 this.splitBaseNameHeader = splitBaseNameHeader;
  •                 // 增加代码结束

  •                 File trackerDirectory = new File(trackerDirPath);

  •                 // if relative path, treat as relative to spool directory
  •                 if (!trackerDirectory.isAbsolute()) {
  •                         trackerDirectory = new File(spoolDirectory, trackerDirPath);
  •                 }

  •                 // ensure that meta directory exists
  •                 if (!trackerDirectory.exists()) {
  •                         if (!trackerDirectory.mkdir()) {
  •                                 throw new IOException(
  •                                                 &quot;Unable to mkdir nonexistent meta directory &quot;
  •                                                                 &#43; trackerDirectory);
  •                         }
  •                 }

  •                 // ensure that the meta directory is a directory
  •                 if (!trackerDirectory.isDirectory()) {
  •                         throw new IOException(&quot;Specified meta directory is not a directory&quot;
  •                                         &#43; trackerDirectory);
  •                 }

  •                 this.metaFile = new File(trackerDirectory, metaFileName);
  •         }

  •         /**
  •          * Return the filename which generated the data from the last successful
  •          * {@link #readEvents(int)} call. Returns null if called before any file
  •          * contents are read.
  •          */
  •         public String getLastFileRead() {
  •                 if (!lastFileRead.isPresent()) {
  •                         return null;
  •                 }
  •                 return lastFileRead.get().getFile().getAbsolutePath();
  •         }

  •         // public interface
  •         public Event readEvent() throws IOException {
  •                 List<Event> events = readEvents(1);
  •                 if (!events.isEmpty()) {
  •                         return events.get(0);
  •                 } else {
  •                         return null;
  •                 }
  •         }

  •         public List<Event> readEvents(int numEvents) throws IOException {
  •                 if (!committed) {
  •                         if (!currentFile.isPresent()) {
  •                                 throw new IllegalStateException(&quot;File should not roll when &quot;
  •                                                 &#43; &quot;commit is outstanding.&quot;);
  •                         }
  •                         logger.info(&quot;Last read was never committed - resetting mark position.&quot;);
  •                         currentFile.get().getDeserializer().reset();
  •                 } else {
  •                         // Check if new files have arrived since last call
  •                         if (!currentFile.isPresent()) {
  •                                 currentFile = getNextFile();
  •                         }
  •                         // Return empty list if no new files
  •                         if (!currentFile.isPresent()) {
  •                                 return Collections.emptyList();
  •                         }
  •                 }

  •                 EventDeserializer des = currentFile.get().getDeserializer();
  •                 List<Event> events = des.readEvents(numEvents);

  •                 /*
  •                  * It's possible that the last read took us just up to a file boundary.
  •                  * If so, try to roll to the next file, if there is one.
  •                  */
  •                 if (events.isEmpty()) {
  •                         retireCurrentFile();
  •                         currentFile = getNextFile();
  •                         if (!currentFile.isPresent()) {
  •                                 return Collections.emptyList();
  •                         }
  •                         events = currentFile.get().getDeserializer().readEvents(numEvents);
  •                 }

  •                 if (annotateFileName) {
  •                         String filename = currentFile.get().getFile().getAbsolutePath();
  •                         for (Event event : events) {
  •                                 event.getHeaders().put(fileNameHeader, filename);
  •                         }
  •                 }

  •                 if (annotateBaseName) {
  •                         String basename = currentFile.get().getFile().getName();
  •                         for (Event event : events) {
  •                                 event.getHeaders().put(baseNameHeader, basename);
  •                         }
  •                 }

  •                 // 增加代码开始

  •                 // 按正则抽取文件名的内容
  •                 if (annotateFileNameExtractor) {

  •                         Matcher matcher = fileNameExtractorPattern.matcher(currentFile
  •                                         .get().getFile().getName());

  •                         if (matcher.find()) {
  •                                 String value = matcher.group();
  •                                 if (convertToTimestamp) {
  •                                         DateTimeFormatter formatter = DateTimeFormat
  •                                                         .forPattern(dateTimeFormat);
  •                                         DateTime dateTime = formatter.parseDateTime(value);

  •                                         value = Long.toString(dateTime.getMillis());
  •                                 }

  •                                 for (Event event : events) {
  •                                         event.getHeaders().put(fileNameExtractorHeader, value);
  •                                 }
  •                         }

  •                 }

  •                 // 按分隔符拆分文件名
  •                 if (splitFileName) {
  •                         String[] splits = currentFile.get().getFile().getName()
  •                                         .split(splitBy);

  •                         for (Event event : events) {
  •                                 for (int i = 0; i < splits.length; i&#43;&#43;) {
  •                                         event.getHeaders().put(splitBaseNameHeader &#43; i, splits);
  •                                 }

  •                         }

  •                 }

  •                 // 增加代码结束

  •                 committed = false;
  •                 lastFileRead = currentFile;
  •                 return events;
  •         }

  •         @Override
  •         public void close() throws IOException {
  •                 if (currentFile.isPresent()) {
  •                         currentFile.get().getDeserializer().close();
  •                         currentFile = Optional.absent();
  •                 }
  •         }

  •         /** Commit the last lines which were read. */
  •         @Override
  •         public void commit() throws IOException {
  •                 if (!committed && currentFile.isPresent()) {
  •                         currentFile.get().getDeserializer().mark();
  •                         committed = true;
  •                 }
  •         }

  •         /**
  •          * Closes currentFile and attempt to rename it.
  •          *
  •          * If these operations fail in a way that may cause duplicate log entries,
  •          * an error is logged but no exceptions are thrown. If these operations fail
  •          * in a way that indicates potential misuse of the spooling directory, a
  •          * FlumeException will be thrown.
  •          *
  •          * @throws FlumeException
  •          *             if files do not conform to spooling assumptions
  •          */
  •         private void retireCurrentFile() throws IOException {
  •                 Preconditions.checkState(currentFile.isPresent());

  •                 File fileToRoll = new File(currentFile.get().getFile()
  •                                 .getAbsolutePath());

  •                 currentFile.get().getDeserializer().close();

  •                 // Verify that spooling assumptions hold
  •                 if (fileToRoll.lastModified() != currentFile.get().getLastModified()) {
  •                         String message = &quot;File has been modified since being read: &quot;
  •                                         &#43; fileToRoll;
  •                         throw new IllegalStateException(message);
  •                 }
  •                 if (fileToRoll.length() != currentFile.get().getLength()) {
  •                         String message = &quot;File has changed size since being read: &quot;
  •                                         &#43; fileToRoll;
  •                         throw new IllegalStateException(message);
  •                 }

  •                 if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) {
  •                         rollCurrentFile(fileToRoll);
  •                 } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
  •                         deleteCurrentFile(fileToRoll);
  •                 } else {
  •                         // TODO: implement delay in the future
  •                         throw new IllegalArgumentException(&quot;Unsupported delete policy: &quot;
  •                                         &#43; deletePolicy);
  •                 }
  •         }

  •         /**
  •          * Rename the given spooled file
  •          *
  •          * @param fileToRoll
  •          * @throws IOException
  •          */
  •         private void rollCurrentFile(File fileToRoll) throws IOException {

  •                 File dest = new File(fileToRoll.getPath() &#43; completedSuffix);
  •                 logger.info(&quot;Preparing to move file {} to {}&quot;, fileToRoll, dest);

  •                 // Before renaming, check whether destination file name exists
  •                 if (dest.exists() && PlatformDetect.isWindows()) {
  •                         /*
  •                          * If we are here, it means the completed file already exists. In
  •                          * almost every case this means the user is violating an assumption
  •                          * of Flume (that log files are placed in the spooling directory
  •                          * with unique names). However, there is a corner case on Windows
  •                          * systems where the file was already rolled but the rename was not
  •                          * atomic. If that seems likely, we let it pass with only a warning.
  •                          */
  •                         if (Files.equal(currentFile.get().getFile(), dest)) {
  •                                 logger.warn(&quot;Completed file &quot; &#43; dest
  •                                                 &#43; &quot; already exists, but files match, so continuing.&quot;);
  •                                 boolean deleted = fileToRoll.delete();
  •                                 if (!deleted) {
  •                                         logger.error(&quot;Unable to delete file &quot;
  •                                                         &#43; fileToRoll.getAbsolutePath()
  •                                                         &#43; &quot;. It will likely be ingested another time.&quot;);
  •                                 }
  •                         } else {
  •                                 String message = &quot;File name has been re-used with different&quot;
  •                                                 &#43; &quot; files. Spooling assumptions violated for &quot; &#43; dest;
  •                                 throw new IllegalStateException(message);
  •                         }

  •                         // Dest file exists and not on windows
  •                 } else if (dest.exists()) {
  •                         String message = &quot;File name has been re-used with different&quot;
  •                                         &#43; &quot; files. Spooling assumptions violated for &quot; &#43; dest;
  •                         throw new IllegalStateException(message);

  •                         // Destination file does not already exist. We are good to go!
  •                 } else {
  •                         boolean renamed = fileToRoll.renameTo(dest);
  •                         if (renamed) {
  •                                 logger.debug(&quot;Successfully rolled file {} to {}&quot;, fileToRoll,
  •                                                 dest);

  •                                 // now we no longer need the meta file
  •                                 deleteMetaFile();
  •                         } else {
  •                                 /*
  •                                  * If we are here then the file cannot be renamed for a reason
  •                                  * other than that the destination file exists (actually, that
  •                                  * remains possible w/ small probability due to TOC-TOU
  •                                  * conditions).
  •                                  */
  •                                 String message = &quot;Unable to move &quot;
  •                                                 &#43; fileToRoll
  •                                                 &#43; &quot; to &quot;
  •                                                 &#43; dest
  •                                                 &#43; &quot;. This will likely cause duplicate events. Please verify that &quot;
  •                                                 &#43; &quot;flume has sufficient permissions to perform these operations.&quot;;
  •                                 throw new FlumeException(message);
  •                         }
  •                 }
  •         }

  •         /**
  •          * Delete the given spooled file
  •          *
  •          * @param fileToDelete
  •          * @throws IOException
  •          */
  •         private void deleteCurrentFile(File fileToDelete) throws IOException {
  •                 logger.info(&quot;Preparing to delete file {}&quot;, fileToDelete);
  •                 if (!fileToDelete.exists()) {
  •                         logger.warn(&quot;Unable to delete nonexistent file: {}&quot;, fileToDelete);
  •                         return;
  •                 }
  •                 if (!fileToDelete.delete()) {
  •                         throw new IOException(&quot;Unable to delete spool file: &quot;
  •                                         &#43; fileToDelete);
  •                 }
  •                 // now we no longer need the meta file
  •                 deleteMetaFile();
  •         }

  •         /**
  •          * Find and open the oldest file in the chosen directory. If two or more
  •          * files are equally old, the file name with lower lexicographical value is
  •          * returned. If the directory is empty, this will return an absent option.
  •          */
  •         private Optional<FileInfo> getNextFile() {
  •                 /* Filter to exclude finished or hidden files */
  •                 FileFilter filter = new FileFilter() {
  •                         public boolean accept(File candidate) {
  •                                 String fileName = candidate.getName();
  •                                 if ((candidate.isDirectory())
  •                                                 || (fileName.endsWith(completedSuffix))
  •                                                 || (fileName.startsWith(&quot;.&quot;))
  •                                                 || ignorePattern.matcher(fileName).matches()) {
  •                                         return false;
  •                                 }
  •                                 return true;
  •                         }
  •                 };
  •                 List<File> candidateFiles = Arrays.asList(spoolDirectory
  •                                 .listFiles(filter));
  •                 if (candidateFiles.isEmpty()) {
  •                         return Optional.absent();
  •                 } else {
  •                         Collections.sort(candidateFiles, new Comparator<File>() {
  •                                 public int compare(File a, File b) {
  •                                         int timeComparison = new Long(a.lastModified())
  •                                                         .compareTo(new Long(b.lastModified()));
  •                                         if (timeComparison != 0) {
  •                                                 return timeComparison;
  •                                         } else {
  •                                                 return a.getName().compareTo(b.getName());
  •                                         }
  •                                 }
  •                         });
  •                         File nextFile = candidateFiles.get(0);
  •                         try {
  •                                 // roll the meta file, if needed
  •                                 String nextPath = nextFile.getPath();
  •                                 PositionTracker tracker = DurablePositionTracker.getInstance(
  •                                                 metaFile, nextPath);
  •                                 if (!tracker.getTarget().equals(nextPath)) {
  •                                         tracker.close();
  •                                         deleteMetaFile();
  •                                         tracker = DurablePositionTracker.getInstance(metaFile,
  •                                                         nextPath);
  •                                 }

  •                                 // sanity check
  •                                 Preconditions
  •                                                 .checkState(
  •                                                                 tracker.getTarget().equals(nextPath),
  •                                                                 &quot;Tracker target %s does not equal expected filename %s&quot;,
  •                                                                 tracker.getTarget(), nextPath);

  •                                 ResettableInputStream in = new ResettableFileInputStream(
  •                                                 nextFile, tracker,
  •                                                 ResettableFileInputStream.DEFAULT_BUF_SIZE,
  •                                                 inputCharset, decodeErrorPolicy);
  •                                 EventDeserializer deserializer = EventDeserializerFactory
  •                                                 .getInstance(deserializerType, deserializerContext, in);

  •                                 return Optional.of(new FileInfo(nextFile, deserializer));
  •                         } catch (FileNotFoundException e) {
  •                                 // File could have been deleted in the interim
  •                                 logger.warn(&quot;Could not find file: &quot; &#43; nextFile, e);
  •                                 return Optional.absent();
  •                         } catch (IOException e) {
  •                                 logger.error(&quot;Exception opening file: &quot; &#43; nextFile, e);
  •                                 return Optional.absent();
  •                         }
  •                 }
  •         }

  •         private void deleteMetaFile() throws IOException {
  •                 if (metaFile.exists() && !metaFile.delete()) {
  •                         throw new IOException(&quot;Unable to delete old meta file &quot; &#43; metaFile);
  •                 }
  •         }

  •         /** An immutable class with information about a file being processed. */
  •         private static class FileInfo {
  •                 private final File file;
  •                 private final long length;
  •                 private final long lastModified;
  •                 private final EventDeserializer deserializer;

  •                 public FileInfo(File file, EventDeserializer deserializer) {
  •                         this.file = file;
  •                         this.length = file.length();
  •                         this.lastModified = file.lastModified();
  •                         this.deserializer = deserializer;
  •                 }

  •                 public long getLength() {
  •                         return length;
  •                 }

  •                 public long getLastModified() {
  •                         return lastModified;
  •                 }

  •                 public EventDeserializer getDeserializer() {
  •                         return deserializer;
  •                 }

  •                 public File getFile() {
  •                         return file;
  •                 }
  •         }

  •         @InterfaceAudience.Private
  •         @InterfaceStability.Unstable
  •         static enum DeletePolicy {
  •                 NEVER, IMMEDIATE, DELAY
  •         }

  •         /**
  •          * Special builder class for ReliableSpoolingFileEventReader
  •          */
  •         public static class Builder {
  •                 private File spoolDirectory;
  •                 private String completedSuffix = SpoolDirectorySourceConfigurationExtConstants.SPOOLED_FILE_SUFFIX;
  •                 private String ignorePattern = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_IGNORE_PAT;
  •                 private String trackerDirPath = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_TRACKER_DIR;
  •                 private Boolean annotateFileName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILE_HEADER;
  •                 private String fileNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_HEADER_KEY;
  •                 private Boolean annotateBaseName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_BASENAME_HEADER;
  •                 private String baseNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_BASENAME_HEADER_KEY;
  •                 private String deserializerType = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DESERIALIZER;
  •                 private Context deserializerContext = new Context();
  •                 private String deletePolicy = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DELETE_POLICY;
  •                 private String inputCharset = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_INPUT_CHARSET;
  •                 private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy
  •                                 .valueOf(SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DECODE_ERROR_POLICY
  •                                                 .toUpperCase());

  •                 // 增加代码开始

  •                 private Boolean annotateFileNameExtractor = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR;
  •                 private String fileNameExtractorHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY;
  •                 private String fileNameExtractorPattern = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_PATTERN;
  •                 private Boolean convertToTimestamp = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP;

  •                 private String dateTimeFormat = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT;

  •                 private Boolean splitFileName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLIT_FILENAME;
  •                 private String splitBy = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLITY_BY;
  •                 private String splitBaseNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLIT_BASENAME_HEADER;

  •                 public Builder annotateFileNameExtractor(
  •                                 Boolean annotateFileNameExtractor) {
  •                         this.annotateFileNameExtractor = annotateFileNameExtractor;
  •                         return this;
  •                 }

  •                 public Builder fileNameExtractorHeader(String fileNameExtractorHeader) {
  •                         this.fileNameExtractorHeader = fileNameExtractorHeader;
  •                         return this;
  •                 }

  •                 public Builder fileNameExtractorPattern(String fileNameExtractorPattern) {
  •                         this.fileNameExtractorPattern = fileNameExtractorPattern;
  •                         return this;
  •                 }

  •                 public Builder convertToTimestamp(Boolean convertToTimestamp) {
  •                         this.convertToTimestamp = convertToTimestamp;
  •                         return this;
  •                 }

  •                 public Builder dateTimeFormat(String dateTimeFormat) {
  •                         this.dateTimeFormat = dateTimeFormat;
  •                         return this;
  •                 }

  •                 public Builder splitFileName(Boolean splitFileName) {
  •                         this.splitFileName = splitFileName;
  •                         return this;
  •                 }

  •                 public Builder splitBy(String splitBy) {
  •                         this.splitBy = splitBy;
  •                         return this;
  •                 }

  •                 public Builder splitBaseNameHeader(String splitBaseNameHeader) {
  •                         this.splitBaseNameHeader = splitBaseNameHeader;
  •                         return this;
  •                 }

  •                 // 增加代码结束

  •                 public Builder spoolDirectory(File directory) {
  •                         this.spoolDirectory = directory;
  •                         return this;
  •                 }

  •                 public Builder completedSuffix(String completedSuffix) {
  •                         this.completedSuffix = completedSuffix;
  •                         return this;
  •                 }

  •                 public Builder ignorePattern(String ignorePattern) {
  •                         this.ignorePattern = ignorePattern;
  •                         return this;
  •                 }

  •                 public Builder trackerDirPath(String trackerDirPath) {
  •                         this.trackerDirPath = trackerDirPath;
  •                         return this;
  •                 }

  •                 public Builder annotateFileName(Boolean annotateFileName) {
  •                         this.annotateFileName = annotateFileName;
  •                         return this;
  •                 }

  •                 public Builder fileNameHeader(String fileNameHeader) {
  •                         this.fileNameHeader = fileNameHeader;
  •                         return this;
  •                 }

  •                 public Builder annotateBaseName(Boolean annotateBaseName) {
  •                         this.annotateBaseName = annotateBaseName;
  •                         return this;
  •                 }

  •                 public Builder baseNameHeader(String baseNameHeader) {
  •                         this.baseNameHeader = baseNameHeader;
  •                         return this;
  •                 }

  •                 public Builder deserializerType(String deserializerType) {
  •                         this.deserializerType = deserializerType;
  •                         return this;
  •                 }

  •                 public Builder deserializerContext(Context deserializerContext) {
  •                         this.deserializerContext = deserializerContext;
  •                         return this;
  •                 }

  •                 public Builder deletePolicy(String deletePolicy) {
  •                         this.deletePolicy = deletePolicy;
  •                         return this;
  •                 }

  •                 public Builder inputCharset(String inputCharset) {
  •                         this.inputCharset = inputCharset;
  •                         return this;
  •                 }

  •                 public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) {
  •                         this.decodeErrorPolicy = decodeErrorPolicy;
  •                         return this;
  •                 }

  •                 public ReliableSpoolingFileEventExtReader build() throws IOException {
  •                         return new ReliableSpoolingFileEventExtReader(spoolDirectory,
  •                                         completedSuffix, ignorePattern, trackerDirPath,
  •                                         annotateFileName, fileNameHeader, annotateBaseName,
  •                                         baseNameHeader, deserializerType, deserializerContext,
  •                                         deletePolicy, inputCharset, decodeErrorPolicy,
  •                                         annotateFileNameExtractor, fileNameExtractorHeader,
  •                                         fileNameExtractorPattern, convertToTimestamp,
  •                                         dateTimeFormat, splitFileName, splitBy, splitBaseNameHeader);
  •                 }
  •         }

  • }

[color=#336699!important]复制代码

  • /*
  • * Licensed to the Apache Software Foundation (ASF) under one or more
  • * contributor license agreements. See the NOTICE file distributed with this
  • * work for additional information regarding copyright ownership. The ASF
  • * licenses this file to you under the Apache License, Version 2.0 (the
  • * &quot;License&quot;); you may not use this file except in compliance with the License.
  • * You may obtain a copy of the License at
  • *
  • * http://www.apache.org/licenses/LICENSE-2.0
  • *
  • * Unless required by applicable law or agreed to in writing, software
  • * distributed under the License is distributed on an &quot;AS IS&quot; BASIS, WITHOUT
  • * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  • * License for the specific language governing permissions and limitations under
  • * the License.
  • */

  • package com.besttone.flume;

  • import org.apache.flume.serialization.DecodeErrorPolicy;

  • public class SpoolDirectorySourceConfigurationExtConstants {
  •         /** Directory where files are deposited. */
  •         public static final String SPOOL_DIRECTORY = &quot;spoolDir&quot;;

  •         /** Suffix appended to files when they are finished being sent. */
  •         public static final String SPOOLED_FILE_SUFFIX = &quot;fileSuffix&quot;;
  •         public static final String DEFAULT_SPOOLED_FILE_SUFFIX = &quot;.COMPLETED&quot;;

  •         /** Header in which to put absolute path filename. */
  •         public static final String FILENAME_HEADER_KEY = &quot;fileHeaderKey&quot;;
  •         public static final String DEFAULT_FILENAME_HEADER_KEY = &quot;file&quot;;

  •         /** Whether to include absolute path filename in a header. */
  •         public static final String FILENAME_HEADER = &quot;fileHeader&quot;;
  •         public static final boolean DEFAULT_FILE_HEADER = false;

  •         /** Header in which to put the basename of file. */
  •         public static final String BASENAME_HEADER_KEY = &quot;basenameHeaderKey&quot;;
  •         public static final String DEFAULT_BASENAME_HEADER_KEY = &quot;basename&quot;;

  •         /** Whether to include the basename of a file in a header. */
  •         public static final String BASENAME_HEADER = &quot;basenameHeader&quot;;
  •         public static final boolean DEFAULT_BASENAME_HEADER = false;

  •         /** What size to batch with before sending to ChannelProcessor. */
  •         public static final String BATCH_SIZE = &quot;batchSize&quot;;
  •         public static final int DEFAULT_BATCH_SIZE = 100;

  •         /** Maximum number of lines to buffer between commits. */
  •         @Deprecated
  •         public static final String BUFFER_MAX_LINES = &quot;bufferMaxLines&quot;;
  •         @Deprecated
  •         public static final int DEFAULT_BUFFER_MAX_LINES = 100;

  •         /** Maximum length of line (in characters) in buffer between commits. */
  •         @Deprecated
  •         public static final String BUFFER_MAX_LINE_LENGTH = &quot;bufferMaxLineLength&quot;;
  •         @Deprecated
  •         public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;

  •         /** Pattern of files to ignore */
  •         public static final String IGNORE_PAT = &quot;ignorePattern&quot;;
  •         public static final String DEFAULT_IGNORE_PAT = &quot;^[        DISCUZ_CODE_1        ]quot;; // no effect

  •         /** Directory to store metadata about files being processed */
  •         public static final String TRACKER_DIR = &quot;trackerDir&quot;;
  •         public static final String DEFAULT_TRACKER_DIR = &quot;.flumespool&quot;;

  •         /** Deserializer to use to parse the file data into Flume Events */
  •         public static final String DESERIALIZER = &quot;deserializer&quot;;
  •         public static final String DEFAULT_DESERIALIZER = &quot;LINE&quot;;

  •         public static final String DELETE_POLICY = &quot;deletePolicy&quot;;
  •         public static final String DEFAULT_DELETE_POLICY = &quot;never&quot;;

  •         /** Character set used when reading the input. */
  •         public static final String INPUT_CHARSET = &quot;inputCharset&quot;;
  •         public static final String DEFAULT_INPUT_CHARSET = &quot;UTF-8&quot;;

  •         /** What to do when there is a character set decoding error. */
  •         public static final String DECODE_ERROR_POLICY = &quot;decodeErrorPolicy&quot;;
  •         public static final String DEFAULT_DECODE_ERROR_POLICY = DecodeErrorPolicy.FAIL
  •                         .name();

  •         public static final String MAX_BACKOFF = &quot;maxBackoff&quot;;

  •         public static final Integer DEFAULT_MAX_BACKOFF = 4000;

  •         // 增加代码开始
  •         public static final String FILENAME_EXTRACTOR = &quot;fileExtractor&quot;;
  •         public static final boolean DEFAULT_FILENAME_EXTRACTOR = false;

  •         public static final String FILENAME_EXTRACTOR_HEADER_KEY = &quot;fileExtractorHeaderKey&quot;;
  •         public static final String DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY = &quot;fileExtractorHeader&quot;;

  •         public static final String FILENAME_EXTRACTOR_PATTERN = &quot;fileExtractorPattern&quot;;
  •         public static final String DEFAULT_FILENAME_EXTRACTOR_PATTERN = &quot;(.)*&quot;;

  •         public static final String FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP = &quot;convertToTimestamp&quot;;
  •         public static final boolean DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP = false;

  •         public static final String FILENAME_EXTRACTOR_DATETIME_FORMAT = &quot;dateTimeFormat&quot;;
  •         public static final String DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT = &quot;yyyy-MM-dd&quot;;


  •         public static final String SPLIT_FILENAME = &quot;splitFileName&quot;;
  •         public static final boolean DEFAULT_SPLIT_FILENAME = false;

  •         public static final String SPLITY_BY = &quot;splitBy&quot;;
  •         public static final String DEFAULT_SPLITY_BY = &quot;\\.&quot;;

  •         public static final String SPLIT_BASENAME_HEADER = &quot;splitBaseNameHeader&quot;;
  •         public static final String DEFAULT_SPLIT_BASENAME_HEADER = &quot;fileNameSplit&quot;;
  •         // 增加代码结束

  • }

[color=#336699!important]复制代码

  • package com.besttone.flume;

  • import static com.besttone.flume.SpoolDirectorySourceConfigurationExtConstants.*;


  • import java.io.File;
  • import java.io.IOException;
  • import java.util.List;
  • import java.util.concurrent.Executors;
  • import java.util.concurrent.ScheduledExecutorService;
  • import java.util.concurrent.TimeUnit;

  • import org.apache.flume.ChannelException;
  • import org.apache.flume.Context;
  • import org.apache.flume.Event;
  • import org.apache.flume.EventDrivenSource;
  • import org.apache.flume.FlumeException;
  • import org.apache.flume.conf.Configurable;
  • import org.apache.flume.instrumentation.SourceCounter;
  • import org.apache.flume.serialization.DecodeErrorPolicy;
  • import org.apache.flume.serialization.LineDeserializer;
  • import org.apache.flume.source.AbstractSource;
  • import org.slf4j.Logger;
  • import org.slf4j.LoggerFactory;

  • import com.google.common.annotations.VisibleForTesting;
  • import com.google.common.base.Preconditions;
  • import com.google.common.base.Throwables;

  • public class SpoolDirectoryExtSource extends AbstractSource implements
  •                 Configurable, EventDrivenSource {

  •         private static final Logger logger = LoggerFactory
  •                         .getLogger(SpoolDirectoryExtSource.class);

  •         // Delay used when polling for new files
  •         private static final int POLL_DELAY_MS = 500;

  •         /* Config options */
  •         private String completedSuffix;
  •         private String spoolDirectory;
  •         private boolean fileHeader;
  •         private String fileHeaderKey;
  •         private boolean basenameHeader;
  •         private String basenameHeaderKey;
  •         private int batchSize;
  •         private String ignorePattern;
  •         private String trackerDirPath;
  •         private String deserializerType;
  •         private Context deserializerContext;
  •         private String deletePolicy;
  •         private String inputCharset;
  •         private DecodeErrorPolicy decodeErrorPolicy;
  •         private volatile boolean hasFatalError = false;

  •         private SourceCounter sourceCounter;
  •         ReliableSpoolingFileEventExtReader reader;
  •         private ScheduledExecutorService executor;
  •         private boolean backoff = true;
  •         private boolean hitChannelException = false;
  •         private int maxBackoff;

  •         // 增加代码开始

  •         private Boolean annotateFileNameExtractor;
  •         private String fileNameExtractorHeader;
  •         private String fileNameExtractorPattern;
  •         private Boolean convertToTimestamp;

  •         private String dateTimeFormat;

  •         private boolean splitFileName;
  •         private  String splitBy;
  •         private  String splitBaseNameHeader;

  •         // 增加代码结束

  •         @Override
  •         public synchronized void start() {
  •                 logger.info(&quot;SpoolDirectorySource source starting with directory: {}&quot;,
  •                                 spoolDirectory);

  •                 executor = Executors.newSingleThreadScheduledExecutor();

  •                 File directory = new File(spoolDirectory);
  •                 try {
  •                         reader = new ReliableSpoolingFileEventExtReader.Builder()
  •                                         .spoolDirectory(directory).completedSuffix(completedSuffix)
  •                                         .ignorePattern(ignorePattern)
  •                                         .trackerDirPath(trackerDirPath)
  •                                         .annotateFileName(fileHeader).fileNameHeader(fileHeaderKey)
  •                                         .annotateBaseName(basenameHeader)
  •                                         .baseNameHeader(basenameHeaderKey)
  •                                         .deserializerType(deserializerType)
  •                                         .deserializerContext(deserializerContext)
  •                                         .deletePolicy(deletePolicy).inputCharset(inputCharset)
  •                                         .decodeErrorPolicy(decodeErrorPolicy)
  •                                         .annotateFileNameExtractor(annotateFileNameExtractor)
  •                                         .fileNameExtractorHeader(fileNameExtractorHeader)
  •                                         .fileNameExtractorPattern(fileNameExtractorPattern)
  •                                         .convertToTimestamp(convertToTimestamp)
  •                                         .dateTimeFormat(dateTimeFormat)
  •                                         .splitFileName(splitFileName).splitBy(splitBy)
  •                                         .splitBaseNameHeader(splitBaseNameHeader).build();
  •                 } catch (IOException ioe) {
  •                         throw new FlumeException(
  •                                         &quot;Error instantiating spooling event parser&quot;, ioe);
  •                 }

  •                 Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
  •                 executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS,
  •                                 TimeUnit.MILLISECONDS);

  •                 super.start();
  •                 logger.debug(&quot;SpoolDirectorySource source started&quot;);
  •                 sourceCounter.start();
  •         }

  •         @Override
  •         public synchronized void stop() {
  •                 executor.shutdown();
  •                 try {
  •                         executor.awaitTermination(10L, TimeUnit.SECONDS);
  •                 } catch (InterruptedException ex) {
  •                         logger.info(&quot;Interrupted while awaiting termination&quot;, ex);
  •                 }
  •                 executor.shutdownNow();

  •                 super.stop();
  •                 sourceCounter.stop();
  •                 logger.info(&quot;SpoolDir source {} stopped. Metrics: {}&quot;, getName(),
  •                                 sourceCounter);
  •         }

  •         @Override
  •         public String toString() {
  •                 return &quot;Spool Directory source &quot; &#43; getName() &#43; &quot;: { spoolDir: &quot;
  •                                 &#43; spoolDirectory &#43; &quot; }&quot;;
  •         }

  •         @Override
  •         public synchronized void configure(Context context) {
  •                 spoolDirectory = context.getString(SPOOL_DIRECTORY);
  •                 Preconditions.checkState(spoolDirectory != null,
  •                                 &quot;Configuration must specify a spooling directory&quot;);

  •                 completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
  •                                 DEFAULT_SPOOLED_FILE_SUFFIX);
  •                 deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
  •                 fileHeader = context.getBoolean(FILENAME_HEADER, DEFAULT_FILE_HEADER);
  •                 fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
  •                                 DEFAULT_FILENAME_HEADER_KEY);
  •                 basenameHeader = context.getBoolean(BASENAME_HEADER,
  •                                 DEFAULT_BASENAME_HEADER);
  •                 basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
  •                                 DEFAULT_BASENAME_HEADER_KEY);
  •                 batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
  •                 inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
  •                 decodeErrorPolicy = DecodeErrorPolicy
  •                                 .valueOf(context.getString(DECODE_ERROR_POLICY,
  •                                                 DEFAULT_DECODE_ERROR_POLICY).toUpperCase());

  •                 ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
  •                 trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);

  •                 deserializerType = context
  •                                 .getString(DESERIALIZER, DEFAULT_DESERIALIZER);
  •                 deserializerContext = new Context(context.getSubProperties(DESERIALIZER
  •                                 &#43; &quot;.&quot;));

  •                 // &quot;Hack&quot; to support backwards compatibility with previous generation of
  •                 // spooling directory source, which did not support deserializers
  •                 Integer bufferMaxLineLength = context
  •                                 .getInteger(BUFFER_MAX_LINE_LENGTH);
  •                 if (bufferMaxLineLength != null && deserializerType != null
  •                                 && deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) {
  •                         deserializerContext.put(LineDeserializer.MAXLINE_KEY,
  •                                         bufferMaxLineLength.toString());
  •                 }

  •                 maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF);
  •                 if (sourceCounter == null) {
  •                         sourceCounter = new SourceCounter(getName());
  •                 }

  •                 //增加代码开始

  •                 annotateFileNameExtractor=context.getBoolean(FILENAME_EXTRACTOR, DEFAULT_FILENAME_EXTRACTOR);
  •                 fileNameExtractorHeader=context.getString(FILENAME_EXTRACTOR_HEADER_KEY, DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY);
  •                 fileNameExtractorPattern=context.getString(FILENAME_EXTRACTOR_PATTERN,DEFAULT_FILENAME_EXTRACTOR_PATTERN);
  •                 convertToTimestamp=context.getBoolean(FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP, DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP);
  •                 dateTimeFormat=context.getString(FILENAME_EXTRACTOR_DATETIME_FORMAT, DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT);

  •                 splitFileName=context.getBoolean(SPLIT_FILENAME, DEFAULT_SPLIT_FILENAME);
  •                 splitBy=context.getString(SPLITY_BY, DEFAULT_SPLITY_BY);
  •                 splitBaseNameHeader=context.getString(SPLIT_BASENAME_HEADER, DEFAULT_SPLIT_BASENAME_HEADER);



  •                 //增加代码结束
  •         }

  •         @VisibleForTesting
  •         protected boolean hasFatalError() {
  •                 return hasFatalError;
  •         }

  •         /**
  •          * The class always backs off, this exists only so that we can test without
  •          * taking a really long time.
  •          *
  •          * @param backoff
  •          *            - whether the source should backoff if the channel is full
  •          */
  •         @VisibleForTesting
  •         protected void setBackOff(boolean backoff) {
  •                 this.backoff = backoff;
  •         }

  •         @VisibleForTesting
  •         protected boolean hitChannelException() {
  •                 return hitChannelException;
  •         }

  •         @VisibleForTesting
  •         protected SourceCounter getSourceCounter() {
  •                 return sourceCounter;
  •         }

  •         private class SpoolDirectoryRunnable implements Runnable {
  •                 private ReliableSpoolingFileEventExtReader reader;
  •                 private SourceCounter sourceCounter;

  •                 public SpoolDirectoryRunnable(
  •                                 ReliableSpoolingFileEventExtReader reader,
  •                                 SourceCounter sourceCounter) {
  •                         this.reader = reader;
  •                         this.sourceCounter = sourceCounter;
  •                 }

  •                 @Override
  •                 public void run() {
  •                         int backoffInterval = 250;
  •                         try {
  •                                 while (!Thread.interrupted()) {
  •                                         List<Event> events = reader.readEvents(batchSize);
  •                                         if (events.isEmpty()) {
  •                                                 break;
  •                                         }
  •                                         sourceCounter.addToEventReceivedCount(events.size());
  •                                         sourceCounter.incrementAppendBatchReceivedCount();

  •                                         try {
  •                                                 getChannelProcessor().processEventBatch(events);
  •                                                 reader.commit();
  •                                         } catch (ChannelException ex) {
  •                                                 logger.warn(&quot;The channel is full, and cannot write data now. The &quot;
  •                                                                 &#43; &quot;source will try again after &quot;
  •                                                                 &#43; String.valueOf(backoffInterval)
  •                                                                 &#43; &quot; milliseconds&quot;);
  •                                                 hitChannelException = true;
  •                                                 if (backoff) {
  •                                                         TimeUnit.MILLISECONDS.sleep(backoffInterval);
  •                                                         backoffInterval = backoffInterval << 1;
  •                                                         backoffInterval = backoffInterval >= maxBackoff ? maxBackoff
  •                                                                         : backoffInterval;
  •                                                 }
  •                                                 continue;
  •                                         }
  •                                         backoffInterval = 250;
  •                                         sourceCounter.addToEventAcceptedCount(events.size());
  •                                         sourceCounter.incrementAppendBatchAcceptedCount();
  •                                 }
  •                                 logger.info(&quot;Spooling Directory Source runner has shutdown.&quot;);
  •                         } catch (Throwable t) {
  •                                 logger.error(
  •                                                 &quot;FATAL: &quot;
  •                                                                 &#43; SpoolDirectoryExtSource.this.toString()
  •                                                                 &#43; &quot;: &quot;
  •                                                                 &#43; &quot;Uncaught exception in SpoolDirectorySource thread. &quot;
  •                                                                 &#43; &quot;Restart or reconfigure Flume to continue processing.&quot;,
  •                                                 t);
  •                                 hasFatalError = true;
  •                                 Throwables.propagate(t);
  •                         }
  •                 }
  •         }
  • }

[color=#336699!important]复制代码主要提供了如下几个设置参数:

tier1.sources.source1.type=com.besttone.flume.SpoolDirectoryExtSource   #写类的全路径名
tier1.sources.source1.spoolDir=/opt/logs   #监控的目录
tier1.sources.source1.splitFileName=true   #是否分隔文件名,并把分割后的内容添加到header中,默认false
tier1.sources.source1.splitBy=\\.                   #以什么符号分隔,默认是&quot;.&quot;分隔
tier1.sources.source1.splitBaseNameHeader=fileNameSplit  #分割后写入header的key的前缀,比如a.log.2014-07-31,按“.&quot;分隔,
则header中有fileNameSplit0=a,fileNameSplit1=log,fileNameSplit2=2014-07-31

(其中还有扩展一个通过正则表达式抽取文件名的功能也在里面,我们这里不用到,就不介绍了)

扩展了这个source之后,前面的那个需求就很容易实现了,只需要:
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{fileNameSplit0}/%{fileNameSplit2}
a.log.2014-07-31这个文件的内容就会保存到hdfs://master68:8020/flume/events/a/2014-07-31目录下面去了。

接下来我们说说如何部署这个我们扩展的自定义的spooling directory source(基于CM的设置)。
首先,我们把上面三个类打成JAR包:SpoolDirectoryExtSource.jar
CM的flume插件目录为:/var/lib/flume-ng/plugins.d

然后再你需要使用这个source的agent上的/var/lib/flume-ng/plugins.d目录下面创建SpoolDirectoryExtSource目录以及子目录lib,libext,native,lib是放插件JAR的目录,libext是放插件的依赖JAR的目录,native放使用到的原生库(如果有用到的话)。

我们这里没有使用到其他的依赖,于是就把SpoolDirectoryExtSource.jar放到lib目录下就好了,最终的目录结构:

  • plugins.d/
  • plugins.d/SpoolDirectoryExtSource/
  • plugins.d/SpoolDirectoryExtSource/lib/SpoolDirectoryExtSource.jar
  • plugins.d/SpoolDirectoryExtSource/libext/
  • plugins.d/SpoolDirectoryExtSource/native/
[color=#336699!important]复制代码重新启动flume agent,flume就会自动装载我们的插件,这样在flume.conf中就可以使用全路径类名配置type属性了。

最终flume.conf配置如下:

  • tier1.sources=source1
  • tier1.channels=channel1
  • tier1.sinks=sink1
  • tier1.sources.source1.type=com.besttone.flume.SpoolDirectoryExtSource
  • tier1.sources.source1.spoolDir=/opt/logs
  • tier1.sources.source1.splitFileName=true
  • tier1.sources.source1.splitBy=\\.
  • tier1.sources.source1.splitBaseNameHeader=fileNameSplit
  • tier1.sources.source1.channels=channel1
  • tier1.sinks.sink1.type=hdfs
  • tier1.sinks.sink1.channel=channel1
  • tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{fileNameSplit0}/%{fileNameSplit2}
  • tier1.sinks.sink1.hdfs.round=true
  • tier1.sinks.sink1.hdfs.roundValue=10
  • tier1.sinks.sink1.hdfs.roundUnit=minute
  • tier1.sinks.sink1.hdfs.fileType=DataStream
  • tier1.sinks.sink1.hdfs.writeFormat=Text
  • tier1.sinks.sink1.hdfs.rollInterval=0
  • tier1.sinks.sink1.hdfs.rollSize=10240
  • tier1.sinks.sink1.hdfs.rollCount=0
  • tier1.sinks.sink1.hdfs.idleTimeout=60
  • tier1.channels.channel1.type=memory
  • tier1.channels.channel1.capacity=10000
  • tier1.channels.channel1.transactionCapacity=1000
  • tier1.channels.channel1.keep-alive=30

[color=#336699!important]复制代码
附上一张用logger作为sink的查看日志文件的截图:
DSC0001.jpg

  

  转自:
  http://www.aboutyun.com/thread-12043-1-1.html
  原文来自:
  http://blog.csdn.net/xiao_jun_0820/article/details/38312091

  和
  http://blog.csdn.net/xiao_jun_0820/article/details/38269079



flume学习(五):flume将log4j日志数据写入到hdfs
flume学习(六):使用hive来分析flume收集的日志数据
flume学习(七)、(八):如何使用event header中的key&#20540;以及自定义source
flume学习(九):自定义拦截器
flume学习(十):使用Morphline Interceptor
flume学习(十一):如何使用Spooling Directory Source

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144407-1-1.html 上篇帖子: spark-streaming连接flume时报错org.jboss.netty.channel.ChannelException: Failed to bin 下篇帖子: flume详细介绍,安装,配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表