设为首页 收藏本站
查看: 812|回复: 0

[经验分享] [Hadoop编程实践]一个实用、清晰的MapReduce程序

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-11 11:00:07 | 显示全部楼层 |阅读模式
  今天写的日志合并MapReduce程序,重新梳理了一遍写一个MapReduce Job的要点:
  1. 参数获取。
  我一般都会在参数中包含这几项:输入文件路径、工作路径(.finish文件所在的目录)、输出文件路径(结果数据所在的路径,在实际工程中,一般和工作路径不同)。还有一个wait/submit参数,用来说明Job是通过waitForCompletion还是submit的方式提交,waitForCompletion在测试和调试时用,submit在生产环境中用。
  2. 参数检查
  各种参数的格式检查,通不过就直接退出,这一步要严格。
  3. 创建Job
  4. 设定mapper、reducer
  可能还需要设定partitioner,sort comparator, grouping comparator,因任务的复杂程度而定。
  5. 设定输入和工作路径
  注意FileOutputFormat.setOutputPath(job, new Path(workingDir));设置的是workingDir,在实践中一般都将workingDir和最终数据的outputDir分开。主要是因为workingDir得到的数据都是part-00000这样的形式,不能自己命名。所以一般会在最后reducer中自己用FileWriter去创建结果数据文件,不用context.write.
  6. 设定输入和输出文件格式
  7. 设置配置项
  为了在mapper、reducer以及Job的其他worker之间共享一些简单的数据,可以使用JobConf. 如果要共享复杂、量大的数据,可以使用DistributedCache。在最近的实践中,有用序列化文件+DistributedCache在各个Job worker之间共享HashMap,List以及其他自定义数据结构的经验,可行。
  8. 提交Job
  
  代码如下,敬请批评。



  1 import java.io.IOException;
  2 import java.util.regex.Matcher;
  3 import java.util.regex.Pattern;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.mapred.JobConf;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
13 import org.apache.hadoop.util.GenericOptionsParser;
14
15 import com.hadoop.compression.lzo.LzopCodec;
16
17 /**
18  * MapReduce job to combine all hourly logs from different data-collection servers
19  * @author lsyang, 20130507
20  */
21 public class HourlyLogCombine {
22     private static String RAW_FILE_PREFIX = "post_";
23     private static String RAW_FILE_POSTFIX = ".log";
24     
25     public static String JOB_CONF_DATE = "HourlyLogCombine.Date";
26     public static String JOB_CONF_HOUR = "HourlyLogCombine.Hour";
27     public static String JOB_CONF_OUTDIR = "HourlyLogCombine.OutDir";
28     
29     private static void showHelpAndExit(String info) {
30         System.err.println("Usage: HourlyLogCombine   " +
31                 " " +
32                 "" +
33                 "" +
34                 "");
35         if(info != null && !info.isEmpty()) {
36             System.err.println("Error: " + info);
37         }
38         System.exit(0);
39     }
40     
41     private static void checkDate(String date) {
42         String regex = "^(20\\d\\d)(0\\d|1[012])(0[1-9]|[12][0-9]|3[01])$";
43         Pattern pattern = Pattern.compile(regex);
44         Matcher matcher = pattern.matcher(date);
45         if (!matcher.find()) {
46             showHelpAndExit("wrong date format.");
47         }
48     }
49
50     private static void checkHour(String hour) {
51         String regex = "^[0-1]\\d|2[0-3]$";
52         Pattern pattern = Pattern.compile(regex);
53         Matcher matcher = pattern.matcher(hour);
54         if (!matcher.find()) {
55             showHelpAndExit("wrong hour format.");
56         }
57     }
58     
59     private static boolean checkWaitOrSubmit(String waitORsubmit) {
60         if (waitORsubmit.equalsIgnoreCase("wait")) {
61             return true;
62         } else if (waitORsubmit.equalsIgnoreCase("submit")) {
63             return false;
64         } else {
65             showHelpAndExit("wait or submit: please check the spelling.");
66             return false;
67         }
68     }
69     
70     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
71         // get the application-specific arguments
72         Configuration conf = new Configuration();
73         String[] params = new GenericOptionsParser(conf, args).getRemainingArgs();
74         if(params.length != 6) {
75             showHelpAndExit("6 params needed.");
76         }
77         
78         // parameters
79         String date = params[0];
80         String hour = params[1];
81         String rawLogHome = params[2];
82         String workingHome = params[3];
83         String combinedLogHome = params[4];
84         String waitORsubmit = params[5];
85         if (!rawLogHome.endsWith("/")) rawLogHome += "/";
86         if(!combinedLogHome.endsWith("/")) combinedLogHome += "/";
87         
88         // check parameters
89         checkDate(date);
90         checkHour(hour);
91         boolean wait = checkWaitOrSubmit(waitORsubmit);
92         
93         // get input files
94         String inputFiles = rawLogHome + "*/" + date + "/" + RAW_FILE_PREFIX + date + "_" + hour + RAW_FILE_POSTFIX;
95         // get working dir, where the .finish file resides
96         String workingDir = workingHome + date + "/" + hour + "/";
97         // get output dir, where the combined log file resides
98         String outDir = combinedLogHome + date + "/";
99         
100         // create a mapreduce job
101         Job job = new Job(conf, "HourlyLogCombine");
102         job.setJarByClass(HourlyLogCombine.class);
103         
104         // set mapper, partitioner and reducer
105         job.setMapperClass(HourlyLogCombineMapper.class);
106         job.setPartitionerClass(HourlyLogCombinePartitioner.class);
107         job.setReducerClass(HourlyLogCombineReducer.class);
108         
109         // set input and output dir
110         FileInputFormat.addInputPath(job, new Path(inputFiles));
111         FileOutputFormat.setOutputPath(job, new Path(workingDir));
112         
113         // set input and output file format
114         job.setInputFormatClass(TextInputFormat.class);
115         job.setOutputFormatClass(TextOutputFormat.class);
116         TextOutputFormat.setCompressOutput(job, true);
117         TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
118         
119         // set configurations
120         JobConf jobConf = (JobConf)job.getConfiguration();
121         jobConf.set(JOB_CONF_DATE, date);
122         jobConf.set(JOB_CONF_HOUR, hour);
123         jobConf.set(JOB_CONF_OUTDIR, outDir);
124         
125         // run the job
126         if (wait) {
127             job.waitForCompletion(true);
128         } else {
129             job.submit();
130         }
131     }
132 }
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85486-1-1.html 上篇帖子: [翻译]Ambari,hadoop的配置,管理和监控项目入门 下篇帖子: hadoop示例程序Grep分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表