Rainie999 发表于 2017-3-2 09:30:45

仿分词统计的MapReduce 程序。

HDFS 数据格式 :
举例单条数据:02-26 08:01:56 INFOasync-statistics - class com.spring.aop.StorageManagerStatAspect${"method":"com.systoon.scloud.master.controller.ImageController.download","ip":"172.28.6.131","port":"38001","father":"sun.reflect.GeneratedMethodAccessor8.invoke/null/-1","requestIp":"106.39.33.246","argsMap":{"org.eclipse.jetty.server.Request:0":{"requestURI":"/f/KZ0wxxbvFz924VaHS8JN1Fk42jV9OBMCHYoLtuc9sAkfF.jpg"},"org.eclipse.jetty.server.Response:1":1462183982},"processTime":50,"time":1456444916225,"retValMap":{":":"this object is null"}}


是写出的一行日志。日志结构是时间 + 打印的类 + JSON
那么现在是要进行一个统计 MR 分析。


那么开始上代码:
       


[*]
[*]import com.alibaba.fastjson.JSON;
[*]import com.alibaba.fastjson.JSONObject;
[*]import com.rocky.util.TimeUtils;
[*]import org.apache.hadoop.fs.FileSystem;
[*]import org.apache.hadoop.fs.Path;
[*]import org.apache.hadoop.io.IntWritable;
[*]import org.apache.hadoop.io.LongWritable;
[*]import org.apache.hadoop.io.Text;
[*]import org.apache.hadoop.mapred.*;
[*]import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
[*]import org.apache.hadoop.util.Progressable;
[*]
[*]import java.io.IOException;
[*]import java.net.URI;
[*]import java.util.Iterator;
[*]
[*]public class MulOutput {
[*]
[*]    public static final String clazz = "com.spring.aop.StorageManagerStatAspect";
[*]    public static final String m_download = "com.systoon.scloud.master.controller.ImageController.download";
[*]    public static final String m_upload   = "com.systoon.scloud.master.controller.DirectUploadFile.directUploadFile";
[*]
[*]    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
[*]
[*]      private final static IntWritable one = new IntWritable(1);
[*]      Text word = new Text();
[*]
[*]      @Override
[*]      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
[*]                        Reporter reporter) throws IOException {
[*]
[*]            String line = value.toString();
[*]            if(line.contains(clazz)){
[*]                if(line.contains(m_download)){
[*]                  String tempObject = line.split(clazz);
[*]                  String tmp = tempObject.substring(1,tempObject.length());
[*]                  JSONObject jsonObject = JSON.parseObject(tmp);
[*]                  String method = jsonObject.get("method").toString();
[*]                  if( method.equals(m_download) ){
[*]                        word.set("download");
[*]                        output.collect(word, one);
[*]                  }
[*]                } else if(line.contains(m_upload)) {
[*]                  String tempObject = line.split(clazz);
[*]                  String tmp = tempObject.substring(1,tempObject.length());
[*]                  JSONObject jsonObject = JSON.parseObject(tmp);
[*]                  String method = jsonObject.get("method").toString();
[*]                  if( method.equals(m_upload) ){
[*]                        word.set("upload");
[*]                        output.collect(word, one);
[*]                  }
[*]                } else {
[*]                  word.set("debug");
[*]                  output.collect(word,one);
[*]                }
[*]            } else {
[*]                word.set("others");
[*]                output.collect(word, one);
[*]
[*]            }
[*]      }
[*]    }
[*]
[*]
[*]    public static class Reduce extends MapReduceBase
[*]            implements Reducer<Text, IntWritable, Text, IntWritable> {
[*]      public void reduce(Text key, Iterator<IntWritable> values,
[*]                           OutputCollector<Text, IntWritable> output, Reporter reporter)
[*]      throws IOException{
[*]            int sum = 0;
[*]            while (values.hasNext()) {
[*]                sum += values.next().get();
[*]            }
[*]            output.collect(key, new IntWritable(sum));
[*]      }
[*]    }
[*]
[*]    public static void main(String[] args) throws Exception{
[*]      JobConf jobConf = new JobConf(MulOutput.class);
[*]      jobConf.setJobName("rocky_test");
[*]
[*]      String outPath = "/test/mapReduce/statis"+TimeUtils.getStringDate();
[*]      final FileSystem filesystem = FileSystem.get(new URI(outPath), jobConf);
[*]      if(filesystem.exists(new Path(outPath))){
[*]            filesystem.delete(new Path(outPath), true);
[*]      }
[*]
[*]      jobConf.setMapperClass(Map.class);         //为job设置Mapper类
[*]      jobConf.setMapOutputKeyClass(Text.class);   //输出数据设置Key类
[*]      jobConf.setMapOutputValueClass(IntWritable.class);//输出数据设置Key类
[*]
[*]      jobConf.setCombinerClass(Reduce.class);             // 为job设置Combiner类
[*]
[*]      jobConf.setReducerClass(Reduce.class);            // 为job设置Reduce类
[*]      jobConf.setOutputKeyClass(Text.class);            // 输出数据设置Key类
[*]      jobConf.setOutputValueClass(IntWritable.class);   // 输出数据设置Key类
[*]
[*]
[*]      FileInputFormat.setInputPaths(jobConf, new Path("/test/mapReduce/statistics.log.2016-02-26"));
[*]
[*]//      // 扫描组合path
[*]//      FileInputFormat.addInputPath();
[*]
[*]      jobConf.setOutputFormat(MyMultipleFilesTextOutputFormat.class);
[*]      FileOutputFormat.setOutputPath(jobConf, new Path(outPath));
[*]
[*]      JobClient.runJob(jobConf);         //运行一个job
[*]
[*]    }
[*]}





简单来讲就是 Map 按行读, Reduce 进行汇总。   也是统计中最最常用的。轻松解决问题。



















来自为知笔记(Wiz)

附件列表
页: [1]
查看完整版本: 仿分词统计的MapReduce 程序。