设为首页 收藏本站
查看: 1332|回复: 0

[经验分享] flume 日志搬家下半场

[复制链接]
发表于 2019-1-30 09:55:08 | 显示全部楼层 |阅读模式
  续上面 获得资源后我们要将转换为相应的日志,落在统一的服务器中.
  在flume中的对file操作的sink只有RollingFileSink但这个对我们来一点用都没有,
package com.ule.flume.sink.file;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import com.google.common.base.Preconditions;
import com.ule.flume.util.Constants;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;
public class FileSink extends AbstractSink implements Configurable {
  private static final Logger logger = Logger.getLogger("sinklog");
  //每隔30秒滚动一个文件。指定0将禁用滚动,并导致所有事件被写入到一个单独的文件中
  private static final long defaultRollInterval = 30;
  private static final int defaultBatchSize = 100;
  private int batchSize = defaultBatchSize;
  private String filePrefix = "";    //文件前缀名
  private String dateType = "yyyy-MM-dd";//按照日期类型输出文件
  private static Map fileMap = new HashMap();
  private static Map streamMap = new HashMap();
  private static Map serializerMap = new HashMap();
  private  SimpleDateFormat format;
  private File directory;
  private long rollInterval;
  private OutputStream outputStream;
  private ScheduledExecutorService rollService;
  private String serializerType;
  private Context serializerContext;
  private EventSerializer serializer;
  private SinkCounter sinkCounter;
//  private FileManager pathController;
  private volatile boolean shouldRotate;
  public FileSink() {
    shouldRotate = false;
  }
  @Override
  public void configure(Context context) {
    String directory = context.getString("file.directory");
    filePrefix = context.getString( "file.filePrefix");
    dateType = context.getString( "file.dateType");
    String rollInterval = context.getString("file.rollInterval");
    serializerType = context.getString("sink.serializer", "TEXT");
    serializerContext =
        new Context(context.getSubProperties("sink." + EventSerializer.CTX_PREFIX));
    Preconditions.checkArgument(directory != null, "Directory may not be null");
    Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
    if (rollInterval == null) {
      this.rollInterval = defaultRollInterval;
    } else {
      this.rollInterval = Long.parseLong(rollInterval);
    }
    batchSize = context.getInteger("file.batchSize", defaultBatchSize);
    this.directory = new File(directory);
    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }
  }
  @Override
  public synchronized void start() {
    logger.info("Starting {}..."+ this);
    sinkCounter.start();
    fileMap.put("start", "start");
    super.start();
  }
  
  @Override
  public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        Status result = Status.READY;
        String path = "";
        String logName = "";
        String infosString = "";
        try {
            transaction.begin();
            event = channel.take();
            if (event != null) {
            String eventinfos = new String(event.getBody());
            logger.debug("eventinfos:" + eventinfos);
            String[] infosStrings = eventinfos.split(Constants.SPLIT);
            if (infosStrings.length >= 3) {
                for (int i = 0; i < infosStrings.length; i++) {
                    switch (i) {
                    case 0:
                        infosString = infosStrings[0].substring(0,infosStrings[0].lastIndexOf("\n"));
                        break;
                    case 1:
                        path = infosStrings[1];
                        break;
                    case 2:
                        logName = infosStrings[2];
                        break;
                    }
                }
                logger.debug("path:" + path);
                logger.debug("logName:" + logName);
                logger.debug("infosString:" + infosString.toString());
                event.setBody(infosString.toString().getBytes());
            }
        }else {
             result = Status.BACKOFF;
        }
       if (StringUtils.isNotBlank(path)&&StringUtils.isNotBlank(logName)) {
            File file=new File(directory+File.separator+path);
            if  (!file.exists()  && !file.isDirectory())      
            {      
                 logger.debug("mkdirFile:" + file .mkdirs());  
            }
            format =  new SimpleDateFormat(dateType);
            String key = file+File.separator+logName;
            String nowDate = format.format(System.currentTimeMillis());
            if ("start".equals(fileMap.get("start"))) {//第一次启动
                sinkCounter.incrementConnectionCreatedCount();
                fileMap.put("start", "end");
            }
            String fileDate = fileMap.get(key);
             File logFile=new File(key+"."+nowDate);
            if (StringUtils.isNotBlank(fileDate)) {//value不为空 说明之前有该key的信息
                if (nowDate.equals(fileDate)) {//时间为当天时间
                    serializerMap.get(key).write(event);
                    serializerMap.get(key).flush();
                    streamMap.get(key).flush();
                }else {//时间不为当天时间
                    try {
                          serializerMap.get(key).flush();
                          serializerMap.get(key).beforeClose();
                          streamMap.get(key).close();
                        } catch (IOException e) {
                            sinkCounter.incrementConnectionFailedCount();
                          throw new EventDeliveryException("Unable to rotate file "
                              + key+ " while delivering event", e);
                        } finally {
                            serializerMap.remove(key);
                            streamMap.remove(key);
                        }
                    fileMap.put(key, nowDate);
                    streamMap.put(key, new BufferedOutputStream(
                                new FileOutputStream(logFile)));
                    serializerMap.put(key, EventSerializerFactory.getInstance(
                            serializerType, serializerContext, streamMap.get(key)));
                    serializerMap.get(key).afterCreate();
                    serializerMap.get(key).write(event);
                    serializerMap.get(key).flush();
                    streamMap.get(key).flush();
                }
            }else {//value为空 说明之前没有 该 key的信息
                    if  (file.exists()){ //文件存在
                        fileMap.put(key, nowDate);
                        streamMap.put(key, new BufferedOutputStream(
                                new FileOutputStream(logFile,true)));
                        serializerMap.put(key, EventSerializerFactory.getInstance(
                                serializerType, serializerContext, streamMap.get(key)));
                        serializerMap.get(key).afterCreate();
                        serializerMap.get(key).write(event);
                        serializerMap.get(key).flush();
                        streamMap.get(key).flush();
                    }else {
                        fileMap.put(key, nowDate);
                        streamMap.put(key, new BufferedOutputStream(
                                    new FileOutputStream(logFile)));
                        serializerMap.put(key, EventSerializerFactory.getInstance(
                                serializerType, serializerContext, streamMap.get(key)));
                        serializerMap.get(key).afterCreate();
                        serializerMap.get(key).write(event);
                        serializerMap.get(key).flush();
                        streamMap.get(key).flush();
                    }
            }
       }  
      int eventAttemptCounter = 0;
          sinkCounter.incrementEventDrainAttemptCount();
          eventAttemptCounter++;
      transaction.commit();
      sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
    } catch (Exception ex) {
      transaction.rollback();
      throw new EventDeliveryException("Failed to process transaction", ex);
    } finally {
      transaction.close();
    }
    return result;
  }
  @Override
  public synchronized void stop() {
    logger.info("RollingFile sink {} stopping..."+ getName());
    sinkCounter.stop();
    super.stop();
        for (String key : serializerMap.keySet()) {
            try {
            serializerMap.get(key).flush();
            serializerMap.get(key).beforeClose();
            streamMap.get(key).close();
            } catch (IOException e) {
                  logger.error("Unable to close output stream. Exception follows.", e);
            }
        }
        sinkCounter.incrementConnectionClosedCount();
        fileMap.clear();
        serializerMap.clear();
        streamMap.clear();
    logger.debug("RollingFile sink {} stopped. Event metrics: {} = "+" getName():"+
        getName() +" sinkCounter:"+ sinkCounter);
  }
  public File getDirectory() {
    return directory;
  }
  public void setDirectory(File directory) {
    this.directory = directory;
  }
  public long getRollInterval() {
    return rollInterval;
  }
  public void setRollInterval(long rollInterval) {
    this.rollInterval = rollInterval;
  }
}  由于我们接收的信息都是从不同机器传输过来的,但是落信息时需要根据传来的不同目录进行划分,所以这里不能用同步,并且每个写的操作都是独立的.需要在内存分开.

  

  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669463-1-1.html 上篇帖子: flume源码分析1 下篇帖子: 开源日志系统比较
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表