仿分词统计的MapReduce 程序。
HDFS 数据格式 :举例单条数据:02-26 08:01:56 INFOasync-statistics - class com.spring.aop.StorageManagerStatAspect${"method":"com.systoon.scloud.master.controller.ImageController.download","ip":"172.28.6.131","port":"38001","father":"sun.reflect.GeneratedMethodAccessor8.invoke/null/-1","requestIp":"106.39.33.246","argsMap":{"org.eclipse.jetty.server.Request:0":{"requestURI":"/f/KZ0wxxbvFz924VaHS8JN1Fk42jV9OBMCHYoLtuc9sAkfF.jpg"},"org.eclipse.jetty.server.Response:1":1462183982},"processTime":50,"time":1456444916225,"retValMap":{":":"this object is null"}}
是写出的一行日志。日志结构是时间 + 打印的类 + JSON
那么现在是要进行一个统计 MR 分析。
那么开始上代码:
[*]
[*]import com.alibaba.fastjson.JSON;
[*]import com.alibaba.fastjson.JSONObject;
[*]import com.rocky.util.TimeUtils;
[*]import org.apache.hadoop.fs.FileSystem;
[*]import org.apache.hadoop.fs.Path;
[*]import org.apache.hadoop.io.IntWritable;
[*]import org.apache.hadoop.io.LongWritable;
[*]import org.apache.hadoop.io.Text;
[*]import org.apache.hadoop.mapred.*;
[*]import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
[*]import org.apache.hadoop.util.Progressable;
[*]
[*]import java.io.IOException;
[*]import java.net.URI;
[*]import java.util.Iterator;
[*]
[*]public class MulOutput {
[*]
[*] public static final String clazz = "com.spring.aop.StorageManagerStatAspect";
[*] public static final String m_download = "com.systoon.scloud.master.controller.ImageController.download";
[*] public static final String m_upload = "com.systoon.scloud.master.controller.DirectUploadFile.directUploadFile";
[*]
[*] public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
[*]
[*] private final static IntWritable one = new IntWritable(1);
[*] Text word = new Text();
[*]
[*] @Override
[*] public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
[*] Reporter reporter) throws IOException {
[*]
[*] String line = value.toString();
[*] if(line.contains(clazz)){
[*] if(line.contains(m_download)){
[*] String tempObject = line.split(clazz);
[*] String tmp = tempObject.substring(1,tempObject.length());
[*] JSONObject jsonObject = JSON.parseObject(tmp);
[*] String method = jsonObject.get("method").toString();
[*] if( method.equals(m_download) ){
[*] word.set("download");
[*] output.collect(word, one);
[*] }
[*] } else if(line.contains(m_upload)) {
[*] String tempObject = line.split(clazz);
[*] String tmp = tempObject.substring(1,tempObject.length());
[*] JSONObject jsonObject = JSON.parseObject(tmp);
[*] String method = jsonObject.get("method").toString();
[*] if( method.equals(m_upload) ){
[*] word.set("upload");
[*] output.collect(word, one);
[*] }
[*] } else {
[*] word.set("debug");
[*] output.collect(word,one);
[*] }
[*] } else {
[*] word.set("others");
[*] output.collect(word, one);
[*]
[*] }
[*] }
[*] }
[*]
[*]
[*] public static class Reduce extends MapReduceBase
[*] implements Reducer<Text, IntWritable, Text, IntWritable> {
[*] public void reduce(Text key, Iterator<IntWritable> values,
[*] OutputCollector<Text, IntWritable> output, Reporter reporter)
[*] throws IOException{
[*] int sum = 0;
[*] while (values.hasNext()) {
[*] sum += values.next().get();
[*] }
[*] output.collect(key, new IntWritable(sum));
[*] }
[*] }
[*]
[*] public static void main(String[] args) throws Exception{
[*] JobConf jobConf = new JobConf(MulOutput.class);
[*] jobConf.setJobName("rocky_test");
[*]
[*] String outPath = "/test/mapReduce/statis"+TimeUtils.getStringDate();
[*] final FileSystem filesystem = FileSystem.get(new URI(outPath), jobConf);
[*] if(filesystem.exists(new Path(outPath))){
[*] filesystem.delete(new Path(outPath), true);
[*] }
[*]
[*] jobConf.setMapperClass(Map.class); //为job设置Mapper类
[*] jobConf.setMapOutputKeyClass(Text.class); //输出数据设置Key类
[*] jobConf.setMapOutputValueClass(IntWritable.class);//输出数据设置Key类
[*]
[*] jobConf.setCombinerClass(Reduce.class); // 为job设置Combiner类
[*]
[*] jobConf.setReducerClass(Reduce.class); // 为job设置Reduce类
[*] jobConf.setOutputKeyClass(Text.class); // 输出数据设置Key类
[*] jobConf.setOutputValueClass(IntWritable.class); // 输出数据设置Key类
[*]
[*]
[*] FileInputFormat.setInputPaths(jobConf, new Path("/test/mapReduce/statistics.log.2016-02-26"));
[*]
[*]// // 扫描组合path
[*]// FileInputFormat.addInputPath();
[*]
[*] jobConf.setOutputFormat(MyMultipleFilesTextOutputFormat.class);
[*] FileOutputFormat.setOutputPath(jobConf, new Path(outPath));
[*]
[*] JobClient.runJob(jobConf); //运行一个job
[*]
[*] }
[*]}
简单来讲就是 Map 按行读, Reduce 进行汇总。 也是统计中最最常用的。轻松解决问题。
来自为知笔记(Wiz)
附件列表
页:
[1]