@Override
public void configure(Context context) {
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
//需要监测的文件
this.monitorFilePath = context.getString("file");
this.rollInterval = context.getInteger("rollInterval",10);
this.path = context.getString("path", catchLocalIP().toString());
this.record = context.getBoolean("record",true);
//需要监测的文件的所在路径
this.positionFile = monitorFilePath + ".position";
Preconditions.checkArgument(monitorFilePath != null, "The file can not be null !");
try {
for (String name : monitorFilePath.split("/")) {
monitorFileName = name;
}
} catch (Exception e) {
log.error("获得监控文件名称失败!",e);
}
try {
//获得这个文件
coreFile = new File(monitorFilePath);
//获得文件的最后修改时间
lastMod = coreFile.lastModified();
} catch (Exception e) {
log.error("Initialize the File/FileChannel Error", e);
return;
}
try {
positionLog = new PositionLog(positionFile);
//获得当前位置文件中的数值
positionValue = positionLog.initPosition();
} catch (Exception e) {
log.error("Initialize the positionValue in File positionLog", e);
return;
}
lastFileSize = positionValue;
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
}
class FileMonitorThread implements Runnable {
/**
* a thread to check whether the file is modified
*/
@Override
public void run() {
{
log.info("FileMonitorThread running ...");
// coreFile = new File(monitorFilePath);
long nowModified = coreFile.lastModified();
// the file has been changed
if (lastMod != nowModified) {
log.debug(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File modified ...");
// you must record the last modified and now file size as
// soon
// as possible
lastMod = nowModified;
nowFileSize = coreFile.length();
int readDataBytesLen = 0;
try {
log.debug("The LastlastFileSize:" +lastFileSize+" nowFileSize :"+nowFileSize);
// it indicated the file is rolled by log4j
if (nowFileSize <= lastFileSize) {
log.debug("The file size is changed to be lower,it indicated that the file is rolled by log4j.");
positionValue = 0L;
}
lastFileSize = nowFileSize;
monitorFile = new RandomAccessFile(coreFile, "r");
// you must be instantiate the file channel Object when
// the
// file
// changed
monitorFileChannel = monitorFile.getChannel();
monitorFileChannel.position(positionValue);
// read file content into buffer
int bytesRead = monitorFileChannel.read(buffer);
// this while for it can not read all the data when the
// file
// modified
while (bytesRead != -1) {
log.debug("How many bytes read in this loop ? --> {} = "+ bytesRead);
String contents = buffer2String(buffer);
// every read,the last byte is \n,this can make sure
// the
// integrity of read data
// include the \n
int lastLineBreak = contents.lastIndexOf("\n") + 1;
String readData = contents.substring(0, lastLineBreak);
byte[] readDataBytes = readData.getBytes();
readDataBytesLen = readDataBytes.length;
positionValue += readDataBytesLen;
// change the position value for next read
String infoString ="";
int infoLength = 0;
if (record) {
infoString = Constants.SPLIT+path+Constants.SPLIT+monitorFileName;
infoLength = infoString.getBytes().length;
log.info("infoString:" + infoString);
log.debug("data: "+(new String(readDataBytes) + infoString)+" end");
}
monitorFileChannel.position(positionValue+infoLength);
// headers.put(Constants.KEY_DATA_SIZE, String.valueOf(readDataBytesLen));
// headers.put(Constants.KEY_DATA_LINE, String.valueOf(readData.split("\n")));
sourceCounter.incrementEventReceivedCount();
// channelProcessor.processEvent(EventBuilder.withBody(readDataBytes,headers));
if (record) {
channelProcessor.processEvent(EventBuilder.withBody((new String(readDataBytes) + infoString).getBytes()));
}else {
channelProcessor.processEvent(EventBuilder.withBody(readDataBytes));
}
sourceCounter.addToEventAcceptedCount(1);
// channelProcessor.processEventBatch(getEventByReadData(readData));
log.debug("Change the next read position {} = "+ positionValue);
buffer.clear();
bytesRead = monitorFileChannel.read(buffer);
}
} catch (Exception e) {
log.error("Read data into Channel Error", e);
log.debug("Save the last positionValue {} into Disk File = positionValue- readDataBytesLen :"+ (positionValue- readDataBytesLen));
positionLog.setPosition(positionValue - readDataBytesLen);
}
counter++;
if (counter % Constants.POSITION_SAVE_COUNTER == 0) {
log.debug(
Constants.POSITION_SAVE_COUNTER
+ " times file modified checked,save the position Value {} into Disk file = "+"positionValue:"+
positionValue);
positionLog.setPosition(positionValue);
}
}
}
}
}
public List<Event> getEventByReadData(String readData) {
String str[] = readData.split("\n");
int len = str.length;
List<Event> events = new ArrayList<Event>();
for (int i = 0; i < len; i++) {
Event event = EventBuilder.withBody((str).getBytes());
events.add(event);
}
return events;
}
public PositionLog() {
}
public PositionLog(String postionFilePath) {
this.postionFile = postionFilePath;
}
public long initPosition() throws Exception {
filePath = postionFile;
File file = new File(filePath);
if (!file.exists()) {
try {
file.createNewFile();
log.debug("Create the postionFile file");
} catch (IOException e) {
log.error("Create the postionFile error", e);
throw e;
}
}
try {
//使用找个文件的权限 rw 读写权限
raf = new RandomAccessFile(filePath, "rw");
//获得文件的通道
this.positionFileChannel =raf.getChannel();
long fileSize = positionFileChannel.size();
if(fileSize==0) {
log.debug("The file content is null,init the value 0");
ByteBuffer buffer = ByteBuffer.allocate(1);
buffer.put("0".getBytes());
buffer.flip();
positionFileChannel.write(buffer);
raf.close();
return 0L;
}else {
return getPosition();
}
} catch (Exception e) {
log.error("Init the position file error",e);
throw e;
}
}
public long getPosition() {
try {
raf = new RandomAccessFile(filePath, "rw");
this.positionFileChannel =raf.getChannel();
long fileSize = positionFileChannel.size();
ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);
int bytesRead = positionFileChannel.read(buffer);
StringBuffer sb = new StringBuffer();
while(bytesRead!=-1) {
buffer.flip();
while(buffer.hasRemaining()) {
sb.append((char)buffer.get());
}
buffer.clear();
bytesRead = positionFileChannel.read(buffer);
}
raf.close();
return Long.parseLong(sb.toString());
} catch (Exception e) {
log.error("Get Position Value Error",e);
return -1;
}
}
public long setPosition(Long position) {
try {
raf = new RandomAccessFile(filePath, "rw");
this.positionFileChannel =raf.getChannel();
String positionStr = String.valueOf(position);
int bufferSize = positionStr.length();
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
buffer.clear();
buffer.put(positionStr.getBytes());
buffer.flip();
while(buffer.hasRemaining()) {
this.positionFileChannel.write(buffer);
}
raf.close();
log.debug("Set Position Value Successfully {}",position);
return position;
} catch (Exception e) {
log.error("Set Position Value Error",e);
return -1;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.jueshizhanhun.flume.util;
public class Constants {
public static String SPLIT = ":jbgsn:";
public static long POSITION_INIT_VALUE = 0L;
public static String KEY_DATA_SIZE = "readDataSize";
public static String KEY_DATA_LINE = "readDataLine";