remington_young 发表于 2019-1-30 09:55:08

flume 日志搬家下半场

  续上面 获得资源后我们要将转换为相应的日志,落在统一的服务器中.
  在flume中的对file操作的sink只有RollingFileSink但这个对我们来一点用都没有,
package com.ule.flume.sink.file;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import com.google.common.base.Preconditions;
import com.ule.flume.util.Constants;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;
public class FileSink extends AbstractSink implements Configurable {
private static final Logger logger = Logger.getLogger("sinklog");
//每隔30秒滚动一个文件。指定0将禁用滚动,并导致所有事件被写入到一个单独的文件中
private static final long defaultRollInterval = 30;
private static final int defaultBatchSize = 100;
private int batchSize = defaultBatchSize;
private String filePrefix = "";    //文件前缀名
private String dateType = "yyyy-MM-dd";//按照日期类型输出文件
private static Map fileMap = new HashMap();
private static Map streamMap = new HashMap();
private static Map serializerMap = new HashMap();
privateSimpleDateFormat format;
private File directory;
private long rollInterval;
private OutputStream outputStream;
private ScheduledExecutorService rollService;
private String serializerType;
private Context serializerContext;
private EventSerializer serializer;
private SinkCounter sinkCounter;
//private FileManager pathController;
private volatile boolean shouldRotate;
public FileSink() {
    shouldRotate = false;
}
@Override
public void configure(Context context) {
    String directory = context.getString("file.directory");
    filePrefix = context.getString( "file.filePrefix");
    dateType = context.getString( "file.dateType");
    String rollInterval = context.getString("file.rollInterval");
    serializerType = context.getString("sink.serializer", "TEXT");
    serializerContext =
      new Context(context.getSubProperties("sink." + EventSerializer.CTX_PREFIX));
    Preconditions.checkArgument(directory != null, "Directory may not be null");
    Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
    if (rollInterval == null) {
      this.rollInterval = defaultRollInterval;
    } else {
      this.rollInterval = Long.parseLong(rollInterval);
    }
    batchSize = context.getInteger("file.batchSize", defaultBatchSize);
    this.directory = new File(directory);
    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }
}
@Override
public synchronized void start() {
    logger.info("Starting {}..."+ this);
    sinkCounter.start();
    fileMap.put("start", "start");
    super.start();
}

@Override
public Status process() throws EventDeliveryException {
      Channel channel = getChannel();
      Transaction transaction = channel.getTransaction();
      Event event = null;
      Status result = Status.READY;
      String path = "";
      String logName = "";
      String infosString = "";
      try {
            transaction.begin();
            event = channel.take();
            if (event != null) {
            String eventinfos = new String(event.getBody());
            logger.debug("eventinfos:" + eventinfos);
            String[] infosStrings = eventinfos.split(Constants.SPLIT);
            if (infosStrings.length >= 3) {
                for (int i = 0; i < infosStrings.length; i++) {
                  switch (i) {
                  case 0:
                        infosString = infosStrings.substring(0,infosStrings.lastIndexOf("\n"));
                        break;
                  case 1:
                        path = infosStrings;
                        break;
                  case 2:
                        logName = infosStrings;
                        break;
                  }
                }
                logger.debug("path:" + path);
                logger.debug("logName:" + logName);
                logger.debug("infosString:" + infosString.toString());
                event.setBody(infosString.toString().getBytes());
            }
      }else {
             result = Status.BACKOFF;
      }
       if (StringUtils.isNotBlank(path)&&StringUtils.isNotBlank(logName)) {
            File file=new File(directory+File.separator+path);
            if(!file.exists()&& !file.isDirectory())      
            {      
               logger.debug("mkdirFile:" + file .mkdirs());
            }
            format =new SimpleDateFormat(dateType);
            String key = file+File.separator+logName;
            String nowDate = format.format(System.currentTimeMillis());
            if ("start".equals(fileMap.get("start"))) {//第一次启动
                sinkCounter.incrementConnectionCreatedCount();
                fileMap.put("start", "end");
            }
            String fileDate = fileMap.get(key);
             File logFile=new File(key+"."+nowDate);
            if (StringUtils.isNotBlank(fileDate)) {//value不为空 说明之前有该key的信息
                if (nowDate.equals(fileDate)) {//时间为当天时间
                  serializerMap.get(key).write(event);
                  serializerMap.get(key).flush();
                  streamMap.get(key).flush();
                }else {//时间不为当天时间
                  try {
                        serializerMap.get(key).flush();
                        serializerMap.get(key).beforeClose();
                        streamMap.get(key).close();
                        } catch (IOException e) {
                            sinkCounter.incrementConnectionFailedCount();
                        throw new EventDeliveryException("Unable to rotate file "
                              + key+ " while delivering event", e);
                        } finally {
                            serializerMap.remove(key);
                            streamMap.remove(key);
                        }
                  fileMap.put(key, nowDate);
                  streamMap.put(key, new BufferedOutputStream(
                              new FileOutputStream(logFile)));
                  serializerMap.put(key, EventSerializerFactory.getInstance(
                            serializerType, serializerContext, streamMap.get(key)));
                  serializerMap.get(key).afterCreate();
                  serializerMap.get(key).write(event);
                  serializerMap.get(key).flush();
                  streamMap.get(key).flush();
                }
            }else {//value为空 说明之前没有 该 key的信息
                  if(file.exists()){ //文件存在
                        fileMap.put(key, nowDate);
                        streamMap.put(key, new BufferedOutputStream(
                              new FileOutputStream(logFile,true)));
                        serializerMap.put(key, EventSerializerFactory.getInstance(
                              serializerType, serializerContext, streamMap.get(key)));
                        serializerMap.get(key).afterCreate();
                        serializerMap.get(key).write(event);
                        serializerMap.get(key).flush();
                        streamMap.get(key).flush();
                  }else {
                        fileMap.put(key, nowDate);
                        streamMap.put(key, new BufferedOutputStream(
                                    new FileOutputStream(logFile)));
                        serializerMap.put(key, EventSerializerFactory.getInstance(
                              serializerType, serializerContext, streamMap.get(key)));
                        serializerMap.get(key).afterCreate();
                        serializerMap.get(key).write(event);
                        serializerMap.get(key).flush();
                        streamMap.get(key).flush();
                  }
            }
       }
      int eventAttemptCounter = 0;
          sinkCounter.incrementEventDrainAttemptCount();
          eventAttemptCounter++;
      transaction.commit();
      sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
    } catch (Exception ex) {
      transaction.rollback();
      throw new EventDeliveryException("Failed to process transaction", ex);
    } finally {
      transaction.close();
    }
    return result;
}
@Override
public synchronized void stop() {
    logger.info("RollingFile sink {} stopping..."+ getName());
    sinkCounter.stop();
    super.stop();
      for (String key : serializerMap.keySet()) {
            try {
            serializerMap.get(key).flush();
            serializerMap.get(key).beforeClose();
            streamMap.get(key).close();
            } catch (IOException e) {
                  logger.error("Unable to close output stream. Exception follows.", e);
            }
      }
      sinkCounter.incrementConnectionClosedCount();
      fileMap.clear();
      serializerMap.clear();
      streamMap.clear();
    logger.debug("RollingFile sink {} stopped. Event metrics: {} = "+" getName():"+
      getName() +" sinkCounter:"+ sinkCounter);
}
public File getDirectory() {
    return directory;
}
public void setDirectory(File directory) {
    this.directory = directory;
}
public long getRollInterval() {
    return rollInterval;
}
public void setRollInterval(long rollInterval) {
    this.rollInterval = rollInterval;
}
}  由于我们接收的信息都是从不同机器传输过来的,但是落信息时需要根据传来的不同目录进行划分,所以这里不能用同步,并且每个写的操作都是独立的.需要在内存分开.

  

  




页: [1]
查看完整版本: flume 日志搬家下半场