帝王 发表于 2019-1-30 09:38:53

flume按照日志时间写hdfs实现

  flume写hdfs的操作在HDFSEventSink.process方法中,路径创建由BucketPath完成
  分析其源码(参考:http://caiguangguang.blog.运维网.com/1652935/1619539)
  可以使用%{}变量替换的形式实现,只需要获取event中时间字段(nginx日志的local time)传入hdfs.path即可
  具体实现如下:
  1.在KafkaSource的process方法中增加:
          dt = KafkaSourceUtil.getDateMessage(new String(kafkaMessage));
          hour = KafkaSourceUtil.getHourMessage(new String(kafkaMessage));
          headers.put("eventdate",dt);
          headers.put("eventhour",hour);
          log.debug("source get one event header info");  增加两个头部,分别用来记录日志的day和hour
  2.KafkaSourceUtil中的方法
  因为我们的消息body是json的,因此用得了java的json-lib包,比如取消息的day:
public static String getDateMessage(String message) {
    String dt = null;
    JSONObject json = JSONObject.fromObject(message);
    String[] splitMessage = json.getString("message").split("\t");
    String logTime = splitMessage.trim();
    log.debug("in getDateMessage logTime is: " + logTime);
    String format = "";
    SimpleDateFormat rawDateFormat = null;
    Date date = null;
    SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyyMMdd");
    rawDateFormat = new SimpleDateFormat(format,Locale.ENGLISH);
    try{
      date = rawDateFormat.parse(logTime);
      dt = dateFormat2.format(date);
      log.debug("in getDateMessage dt is: " + dt);
    }catch(Exception ex){
      dt = "empty";
    }
    return dt;
}  2.hdfs.path设置头即可
agent-server4.sinks.hdfs-sink2.type = hdfs
agent-server4.sinks.hdfs-sink2.hdfs.path = hdfs://xxx:8020/data/flume/mobile-ubt-all/%{eventdate}/%{eventhour}  最终日志:
flume-server4.log.3:09 Apr 2015 15:18:49,966 DEBUG (org.apache.flume.sink.hdfs.BucketWriter$2.call:276)- Rolling file (hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp): Roll scheduled after 60 sec elapsed.
flume-server4.log.3:09 Apr 2015 15:18:49,969 INFO (org.apache.flume.sink.hdfs.BucketWriter.close:363)- Closing hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp
flume-server4.log.3:09 Apr 2015 15:18:49,990 INFO (org.apache.flume.sink.hdfs.BucketWriter$8.call:629)- Renaming hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp to hdfs://192.168.101.6:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866


页: [1]
查看完整版本: flume按照日志时间写hdfs实现