|
续上面 获得资源后我们要将转换为相应的日志,落在统一的服务器中.
在flume中的对file操作的sink只有RollingFileSink但这个对我们来一点用都没有,
package com.ule.flume.sink.file;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import com.google.common.base.Preconditions;
import com.ule.flume.util.Constants;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;
public class FileSink extends AbstractSink implements Configurable {
private static final Logger logger = Logger.getLogger("sinklog");
//每隔30秒滚动一个文件。指定0将禁用滚动,并导致所有事件被写入到一个单独的文件中
private static final long defaultRollInterval = 30;
private static final int defaultBatchSize = 100;
private int batchSize = defaultBatchSize;
private String filePrefix = ""; //文件前缀名
private String dateType = "yyyy-MM-dd";//按照日期类型输出文件
private static Map fileMap = new HashMap();
private static Map streamMap = new HashMap();
private static Map serializerMap = new HashMap();
private SimpleDateFormat format;
private File directory;
private long rollInterval;
private OutputStream outputStream;
private ScheduledExecutorService rollService;
private String serializerType;
private Context serializerContext;
private EventSerializer serializer;
private SinkCounter sinkCounter;
// private FileManager pathController;
private volatile boolean shouldRotate;
public FileSink() {
shouldRotate = false;
}
@Override
public void configure(Context context) {
String directory = context.getString("file.directory");
filePrefix = context.getString( "file.filePrefix");
dateType = context.getString( "file.dateType");
String rollInterval = context.getString("file.rollInterval");
serializerType = context.getString("sink.serializer", "TEXT");
serializerContext =
new Context(context.getSubProperties("sink." + EventSerializer.CTX_PREFIX));
Preconditions.checkArgument(directory != null, "Directory may not be null");
Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
if (rollInterval == null) {
this.rollInterval = defaultRollInterval;
} else {
this.rollInterval = Long.parseLong(rollInterval);
}
batchSize = context.getInteger("file.batchSize", defaultBatchSize);
this.directory = new File(directory);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
@Override
public synchronized void start() {
logger.info("Starting {}..."+ this);
sinkCounter.start();
fileMap.put("start", "start");
super.start();
}
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
Status result = Status.READY;
String path = "";
String logName = "";
String infosString = "";
try {
transaction.begin();
event = channel.take();
if (event != null) {
String eventinfos = new String(event.getBody());
logger.debug("eventinfos:" + eventinfos);
String[] infosStrings = eventinfos.split(Constants.SPLIT);
if (infosStrings.length >= 3) {
for (int i = 0; i < infosStrings.length; i++) {
switch (i) {
case 0:
infosString = infosStrings[0].substring(0,infosStrings[0].lastIndexOf("\n"));
break;
case 1:
path = infosStrings[1];
break;
case 2:
logName = infosStrings[2];
break;
}
}
logger.debug("path:" + path);
logger.debug("logName:" + logName);
logger.debug("infosString:" + infosString.toString());
event.setBody(infosString.toString().getBytes());
}
}else {
result = Status.BACKOFF;
}
if (StringUtils.isNotBlank(path)&&StringUtils.isNotBlank(logName)) {
File file=new File(directory+File.separator+path);
if (!file.exists() && !file.isDirectory())
{
logger.debug("mkdirFile:" + file .mkdirs());
}
format = new SimpleDateFormat(dateType);
String key = file+File.separator+logName;
String nowDate = format.format(System.currentTimeMillis());
if ("start".equals(fileMap.get("start"))) {//第一次启动
sinkCounter.incrementConnectionCreatedCount();
fileMap.put("start", "end");
}
String fileDate = fileMap.get(key);
File logFile=new File(key+"."+nowDate);
if (StringUtils.isNotBlank(fileDate)) {//value不为空 说明之前有该key的信息
if (nowDate.equals(fileDate)) {//时间为当天时间
serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();
}else {//时间不为当天时间
try {
serializerMap.get(key).flush();
serializerMap.get(key).beforeClose();
streamMap.get(key).close();
} catch (IOException e) {
sinkCounter.incrementConnectionFailedCount();
throw new EventDeliveryException("Unable to rotate file "
+ key+ " while delivering event", e);
} finally {
serializerMap.remove(key);
streamMap.remove(key);
}
fileMap.put(key, nowDate);
streamMap.put(key, new BufferedOutputStream(
new FileOutputStream(logFile)));
serializerMap.put(key, EventSerializerFactory.getInstance(
serializerType, serializerContext, streamMap.get(key)));
serializerMap.get(key).afterCreate();
serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();
}
}else {//value为空 说明之前没有 该 key的信息
if (file.exists()){ //文件存在
fileMap.put(key, nowDate);
streamMap.put(key, new BufferedOutputStream(
new FileOutputStream(logFile,true)));
serializerMap.put(key, EventSerializerFactory.getInstance(
serializerType, serializerContext, streamMap.get(key)));
serializerMap.get(key).afterCreate();
serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();
}else {
fileMap.put(key, nowDate);
streamMap.put(key, new BufferedOutputStream(
new FileOutputStream(logFile)));
serializerMap.put(key, EventSerializerFactory.getInstance(
serializerType, serializerContext, streamMap.get(key)));
serializerMap.get(key).afterCreate();
serializerMap.get(key).write(event);
serializerMap.get(key).flush();
streamMap.get(key).flush();
}
}
}
int eventAttemptCounter = 0;
sinkCounter.incrementEventDrainAttemptCount();
eventAttemptCounter++;
transaction.commit();
sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to process transaction", ex);
} finally {
transaction.close();
}
return result;
}
@Override
public synchronized void stop() {
logger.info("RollingFile sink {} stopping..."+ getName());
sinkCounter.stop();
super.stop();
for (String key : serializerMap.keySet()) {
try {
serializerMap.get(key).flush();
serializerMap.get(key).beforeClose();
streamMap.get(key).close();
} catch (IOException e) {
logger.error("Unable to close output stream. Exception follows.", e);
}
}
sinkCounter.incrementConnectionClosedCount();
fileMap.clear();
serializerMap.clear();
streamMap.clear();
logger.debug("RollingFile sink {} stopped. Event metrics: {} = "+" getName():"+
getName() +" sinkCounter:"+ sinkCounter);
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public long getRollInterval() {
return rollInterval;
}
public void setRollInterval(long rollInterval) {
this.rollInterval = rollInterval;
}
} 由于我们接收的信息都是从不同机器传输过来的,但是落信息时需要根据传来的不同目录进行划分,所以这里不能用同步,并且每个写的操作都是独立的.需要在内存分开.
|
|
|