|
今天写的日志合并MapReduce程序,重新梳理了一遍写一个MapReduce Job的要点:
1. 参数获取。
我一般都会在参数中包含这几项:输入文件路径、工作路径(.finish文件所在的目录)、输出文件路径(结果数据所在的路径,在实际工程中,一般和工作路径不同)。还有一个wait/submit参数,用来说明Job是通过waitForCompletion还是submit的方式提交,waitForCompletion在测试和调试时用,submit在生产环境中用。
2. 参数检查
各种参数的格式检查,通不过就直接退出,这一步要严格。
3. 创建Job
4. 设定mapper、reducer
可能还需要设定partitioner,sort comparator, grouping comparator,因任务的复杂程度而定。
5. 设定输入和工作路径
注意FileOutputFormat.setOutputPath(job, new Path(workingDir));设置的是workingDir,在实践中一般都将workingDir和最终数据的outputDir分开。主要是因为workingDir得到的数据都是part-00000这样的形式,不能自己命名。所以一般会在最后reducer中自己用FileWriter去创建结果数据文件,不用context.write.
6. 设定输入和输出文件格式
7. 设置配置项
为了在mapper、reducer以及Job的其他worker之间共享一些简单的数据,可以使用JobConf. 如果要共享复杂、量大的数据,可以使用DistributedCache。在最近的实践中,有用序列化文件+DistributedCache在各个Job worker之间共享HashMap,List以及其他自定义数据结构的经验,可行。
8. 提交Job
代码如下,敬请批评。
1 import java.io.IOException;
2 import java.util.regex.Matcher;
3 import java.util.regex.Pattern;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.mapred.JobConf;
8 import org.apache.hadoop.mapreduce.Job;
9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
13 import org.apache.hadoop.util.GenericOptionsParser;
14
15 import com.hadoop.compression.lzo.LzopCodec;
16
17 /**
18 * MapReduce job to combine all hourly logs from different data-collection servers
19 * @author lsyang, 20130507
20 */
21 public class HourlyLogCombine {
22 private static String RAW_FILE_PREFIX = "post_";
23 private static String RAW_FILE_POSTFIX = ".log";
24
25 public static String JOB_CONF_DATE = "HourlyLogCombine.Date";
26 public static String JOB_CONF_HOUR = "HourlyLogCombine.Hour";
27 public static String JOB_CONF_OUTDIR = "HourlyLogCombine.OutDir";
28
29 private static void showHelpAndExit(String info) {
30 System.err.println("Usage: HourlyLogCombine " +
31 " " +
32 "" +
33 "" +
34 "");
35 if(info != null && !info.isEmpty()) {
36 System.err.println("Error: " + info);
37 }
38 System.exit(0);
39 }
40
41 private static void checkDate(String date) {
42 String regex = "^(20\\d\\d)(0\\d|1[012])(0[1-9]|[12][0-9]|3[01])$";
43 Pattern pattern = Pattern.compile(regex);
44 Matcher matcher = pattern.matcher(date);
45 if (!matcher.find()) {
46 showHelpAndExit("wrong date format.");
47 }
48 }
49
50 private static void checkHour(String hour) {
51 String regex = "^[0-1]\\d|2[0-3]$";
52 Pattern pattern = Pattern.compile(regex);
53 Matcher matcher = pattern.matcher(hour);
54 if (!matcher.find()) {
55 showHelpAndExit("wrong hour format.");
56 }
57 }
58
59 private static boolean checkWaitOrSubmit(String waitORsubmit) {
60 if (waitORsubmit.equalsIgnoreCase("wait")) {
61 return true;
62 } else if (waitORsubmit.equalsIgnoreCase("submit")) {
63 return false;
64 } else {
65 showHelpAndExit("wait or submit: please check the spelling.");
66 return false;
67 }
68 }
69
70 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
71 // get the application-specific arguments
72 Configuration conf = new Configuration();
73 String[] params = new GenericOptionsParser(conf, args).getRemainingArgs();
74 if(params.length != 6) {
75 showHelpAndExit("6 params needed.");
76 }
77
78 // parameters
79 String date = params[0];
80 String hour = params[1];
81 String rawLogHome = params[2];
82 String workingHome = params[3];
83 String combinedLogHome = params[4];
84 String waitORsubmit = params[5];
85 if (!rawLogHome.endsWith("/")) rawLogHome += "/";
86 if(!combinedLogHome.endsWith("/")) combinedLogHome += "/";
87
88 // check parameters
89 checkDate(date);
90 checkHour(hour);
91 boolean wait = checkWaitOrSubmit(waitORsubmit);
92
93 // get input files
94 String inputFiles = rawLogHome + "*/" + date + "/" + RAW_FILE_PREFIX + date + "_" + hour + RAW_FILE_POSTFIX;
95 // get working dir, where the .finish file resides
96 String workingDir = workingHome + date + "/" + hour + "/";
97 // get output dir, where the combined log file resides
98 String outDir = combinedLogHome + date + "/";
99
100 // create a mapreduce job
101 Job job = new Job(conf, "HourlyLogCombine");
102 job.setJarByClass(HourlyLogCombine.class);
103
104 // set mapper, partitioner and reducer
105 job.setMapperClass(HourlyLogCombineMapper.class);
106 job.setPartitionerClass(HourlyLogCombinePartitioner.class);
107 job.setReducerClass(HourlyLogCombineReducer.class);
108
109 // set input and output dir
110 FileInputFormat.addInputPath(job, new Path(inputFiles));
111 FileOutputFormat.setOutputPath(job, new Path(workingDir));
112
113 // set input and output file format
114 job.setInputFormatClass(TextInputFormat.class);
115 job.setOutputFormatClass(TextOutputFormat.class);
116 TextOutputFormat.setCompressOutput(job, true);
117 TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
118
119 // set configurations
120 JobConf jobConf = (JobConf)job.getConfiguration();
121 jobConf.set(JOB_CONF_DATE, date);
122 jobConf.set(JOB_CONF_HOUR, hour);
123 jobConf.set(JOB_CONF_OUTDIR, outDir);
124
125 // run the job
126 if (wait) {
127 job.waitForCompletion(true);
128 } else {
129 job.submit();
130 }
131 }
132 }
|
|
|