|
1 package zhouls.bigdata.myMapReduce.TemperatureTest;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.io.IntWritable;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapreduce.Mapper;
9 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
10 import org.apache.hadoop.conf.Configuration;
11 import org.apache.hadoop.conf.Configured;
12 import org.apache.hadoop.fs.FileSystem;
13 import org.apache.hadoop.fs.Path;
14 import org.apache.hadoop.io.IntWritable;
15 import org.apache.hadoop.io.Text;
16 import org.apache.hadoop.mapreduce.Job;
17 import org.apache.hadoop.mapreduce.Mapper;
18 import org.apache.hadoop.mapreduce.Reducer;
19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
20 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
22 import org.apache.hadoop.util.Tool;
23 import org.apache.hadoop.util.ToolRunner;
24
25
26 /*
27 Hadoop内置的数据类型:
28 BooleanWritable:标准布尔型数值
29 ByteWritable:单字节数值
30 DoubleWritable:双字节数值
31 FloatWritable:浮点数
32 IntWritable:整型数
33 LongWritable:长整型数
34 Text:使用UTF8格式存储的文本
35 NullWritable:当<key, value>中的key或value为空时使用
36 */
37
38
39 /**
40 * 统计美国每个气象站30年来的平均气温
41 * 1、编写map()函数
42 * 2、编写reduce()函数
43 * 3、编写run()执行方法,负责运行MapReduce作业
44 * 4、在main()方法中运行程序
45 *
46 * @author zhouls
47 *
48 */
49 //继承Configured类,实现Tool接口
50 public>
51 public static>
52 //输入的key,输入的value,输出的key,输出的value
53 //输入的LongWritable键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。
54
55 // 在这种情况下,我们将气象站id按 Text 对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。
56
57
58 // map 函数的功能仅限于提取气象站和气温信息
59
60 /**
61 * @function Mapper 解析气象站数据
62 * @input key=偏移量 value=气象站数据
63 * @output key=weatherStationId value=temperature
64 */
65 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
66 //map()函数还提供了context实例,用于键值对的输出 或者说 map() 方法还提供了 Context 实例用于输出内容的写入
67
68 // 就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。
69 // 同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似
70
71
72 //第一步,我们将每行气象站数据转换为每行的String类型
73 String line = value.toString(); //每行气象数据
74 // values是1980 12 01 00 78 -17 10237 180 21 1 0 0
75 // line是"1980 12 01 00 78 -17 10237 180 21 1 0 0"
76
77
78 //第二步:提取气温值
79 int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小时气温值
80 //需要转换为整形,截取第14位到19位,从第0位开始,trim()的功能是去掉首尾空格。
81 //substring()方法截取我们业务需要的值
82
83 // substring(start, stop)其内容是从 start 处到 stop-1 处的所有字符,其长度为 stop 减 start。
84
85 // 如Hello world! 若是substring(3,7) 则是lo w
86
87 // Integer.parseInt() 返回的是一个int的值。在这里, 给temperature
88
89 // new Integer.valueof()返回的是Integer的对象。
90 // Integer.parseInt() 返回的是一个int的值。
91 // new Integer.valueof().intValue();返回的也是一个int的值。
92
93
94
95
96
97 // 1980 12 01 00 78 -17 10237 180 21 1 0 0
98 //78是气温值
99
100 // temperature是78
101
102 // 30yr_03103.dat
103 // 30yr_03812.dat
104 // 30yr_03813.dat
105 // 30yr_03816.dat
106 // 30yr_03820.dat
107 // 30yr_03822.dat
108 // 30yr_03856.dat
109 // 30yr_03860.dat
110 // 30yr_03870.dat
111 // 30yr_03872.dat
112
113
114 // (0,1985 07 31 02 200 94 10137 220 26 1 0 -9999)
115 // (62,1985 07 31 03 172 94 10142 240 0 0 0 -9999)
116 // (124,1985 07 31 04 156 83 10148 260 10 0 0 -9999)
117 // (186,1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999)
118 // (248,1985 07 31 06 122 72 -9999 90 0 -9999 0 0)
119 // (310,1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999)
120 // (371,1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999)
121 // (434,1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999)
122 // (497,1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999)
123 // (560,1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999)
124
125 // (03103,[200,172,156,133,122,117,111,111,106,100])
126
127 // 根据自己业务需要 , map 函数的功能仅限于提取气象站和气温信息
128
129
130 // 1998 #year
131 // 03 #month
132 // 09 #day
133 // 17 #hour
134 // 11 #temperature 感兴趣
135 // -100 #dew
136 // 10237 #pressure
137 // 60 #wind_direction
138 // 72 #wind_speed
139 // 0 #sky_condition
140 // 0 #rain_1h
141 // -9999 #rain_6h
142
143
144 if (temperature != -9999){//过滤无效数据
145 //第三步:提取气象站编号
146 //获取输入分片
147 FileSplit fileSplit = (FileSplit) context.getInputSplit();//提取问加你输入分片,并转换类型
148 // 即由InputSplit -> FileSplit
149
150 // context.getInputSplit()
151 // (FileSplit) context.getInputSplit()这是强制转换
152 // fileSplit的值是file:/D:/Code/MyEclipseJavaCode/myMapReduce/data/temperature/30yr_03870.dat:0+16357956
153 // 即,读的是30yr_03870.dat这个文件
154
155
156 //然后通过文件名称提取气象站编号
157 String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
158 //首先通过文件分片fileSplit来获取文件路径,然后再获取文件名字,然后截取第5位到第10位就可以得到气象站 编号
159 // fileSplit.getPath()
160 // fileSplit.getPath().getName()
161
162 // 30yr_03870.dat 我们只需获取03870就是气象站编号
163
164 // fileSplit.getPath().getName().substring(5, 10) //从0开始,即第5个开始截取,到第10个为止,第10个没有拿到,所以为03870
165 // weatherStationId是03870
166
167
168
169 context.write(new Text(weatherStationId), new IntWritable(temperature));//写入weatherStationId是k2,temperature是v2
170 // context.write(weatherStationId,temperature);等价 ,但是若是直接这样写会出错,因为, weatherStationId是String类型,注意与Text类型还是有区别的!
171 //气象站编号,气温值
172 }
173 }
174 }
175
176
177
178 public static>
179 private IntWritable result = new IntWritable();//存取结果
180 //因为气温是IntWritable类型
181 public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
182 // Iterable<IntWritable> values是iterable(迭代器)变量
183
184
185 // Iterable<IntWritable> values和IntWritable values这样有什么区别?
186 // 前者是iterable(迭代器)变量,后者是intwriteable(int的封装)变量
187
188
189 // Iterable<IntWritable> values
190 // 迭代器,valuses是 iterable(迭代器)变量,类型是IntWritable
191
192
193 //reduce输出的key,key的集合,context的实例
194 //第一步:统计相同气象站的所有气温
195 int sum = 0;
196 int count = 0;
197 for (IntWritable val : values) //星型for循环来循环同一个气象站的所有气温值,即将values的值一一传给IntWritable val
198 // IntWritable val是IntWritable(int的封装)变量
199
200 {//对所有气温值累加
201 sum += val.get();//去val里拿一个值,就sum下
202
203 // val.get()去拿值
204
205 count++;
206 }
207 result.set(sum / count);//设为v3
208 // result.set(sum / count)去设置,将sum / count的值,设给result
209 // sum是21299119 count是258616 = 82.3580869
210
211
212 context.write(key,result);//写入key是k3,result是v3
213 }
214 }
215
216
217
218 public int run(String[] args) throws Exception{
219 // TODO Auto-generated method stub
220 //第一步:读取配置文件
221 Configuration conf = new Configuration();//程序里,只需写这么一句话,就会加载到hadoop的配置文件了
222 //Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。
223
224 // new Configuration()
225
226 //第二步:输出路径存在就先删除
227 Path mypath = new Path(args[1]);//定义输出路径的Path对象,mypath
228
229
230 // new Path(args[1])将args[1]的值,给mypath
231
232 FileSystem hdfs = mypath.getFileSystem(conf);//程序里,只需写这么一句话,就可以获取到文件系统了。
233 //FileSystem里面包括很多系统,不局限于hdfs,是因为,程序读到conf,哦,原来是hadoop集群啊。这时,才认知到是hdfs
234
235 if (hdfs.isDirectory(mypath))//如果输出路径存在
236 {
237 hdfs.delete(mypath, true);//则就删除
238 }
239 //第三步:构建job对象
240 Job job = new Job(conf, "temperature");//新建一个任务,job名字是tempreature
241
242 // new Job(conf, "temperature")有这么个构造方法
243
244 job.setJarByClass(Temperature.class);// 设置主类
245 //通过job对象来设置主类Temperature.class
246
247 //第四步:指定数据的输入路径和输出路径
248 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径,args[0]
249 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径,args[1]
250
251 //第五步:指定Mapper和Reducer
252 job.setMapperClass(TemperatureMapper.class);// Mapper
253 job.setReducerClass(TemperatureReducer.class);// Reducer
254
255 //第六步:设置map函数和reducer函数的输出类型
256 job.setOutputKeyClass(Text.class);
257 job.setOutputValueClass(IntWritable.class);
258
259 //第七步:提交作业
260 return job.waitForCompletion(true)?0:1;//提交任务
261 }
262
263
264 /**
265 * @function main 方法
266 * @param args
267 * @throws Exception
268 */
269 public static void main(String[] args) throws Exception {
270 //第一步
271 // String[] args0 =
272 // {
273 // "hdfs://djt002:9000/inputData/temperature/",
274 // "hdfs://djt002:9000/outData/temperature/"
275 // };
276
277 String[] args0 = {"./data/temperature/","./out/temperature/"};
278
279 // args0是输入路径和输出路径的属组
280
281 //第二步
282 int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
283
284 // ToolRunner.run(new Configuration(), new Temperature(), args0)有这么一个构造方法
285
286 //第一个参数是读取配置文件,第二个参数是主类Temperature,第三个参数是输入路径和输出路径的属组
287 System.exit(ec);
288 }
289
290 }
291 |
|