|
开一个监听持续间断的获取某个日志的续写的信息,并传入sink中,在flume默认的组建中并没用这样的功能,只能自己根据业务就行开发,下面flume获得source信息
概要:首先 我们在获得持续输出的日志并创建一个文件中记录我们获取这个日志的信息变化的位置,根据这个位置文件来完成,我们需要的断点续传功能.
所谓日志搬家我们必须要知道这个日志 是哪里来要搬到哪里去 这里是source我只做在哪里来,
首先上三段代码
package org.jueshizhanhun.flume.source.filemonitor;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
import com.ule.flume.util.Constants;
public class FileMonitorSource extends AbstractSource implements Configurable, EventDrivenSource {
private static final Logger log = Logger.getLogger("sourcelog");
private String path = "";//设置本机ip
private long rollInterval = 10;//启动监听获取信息时间间隔
private boolean record = true;//是否开启获得IP和日志文件名 开关
private ChannelProcessor channelProcessor;
private RandomAccessFile monitorFile = null;
private File coreFile = null;
private long lastMod = 0L;
private String monitorFilePath = null;
private String monitorFileName = null;
private String positionFile = null;
private FileChannel monitorFileChannel = null;
private ByteBuffer buffer = ByteBuffer.allocate(1 |
|
|