设为首页 收藏本站
查看: 671|回复: 0

[经验分享] Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)

[复制链接]

尚未签到

发表于 2017-12-18 13:26:17 | 显示全部楼层 |阅读模式
1 package zhouls.bigdata.myMapReduce.TemperatureTest;  

  2  
  3 import java.io.IOException;
  
  4
  
  5 import org.apache.hadoop.io.IntWritable;
  
  6 import org.apache.hadoop.io.LongWritable;
  
  7 import org.apache.hadoop.io.Text;
  
  8 import org.apache.hadoop.mapreduce.Mapper;
  
  9 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  
10 import org.apache.hadoop.conf.Configuration;
  
11 import org.apache.hadoop.conf.Configured;
  
12 import org.apache.hadoop.fs.FileSystem;
  
13 import org.apache.hadoop.fs.Path;
  
14 import org.apache.hadoop.io.IntWritable;
  
15 import org.apache.hadoop.io.Text;
  
16 import org.apache.hadoop.mapreduce.Job;
  
17 import org.apache.hadoop.mapreduce.Mapper;
  
18 import org.apache.hadoop.mapreduce.Reducer;
  
19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
20 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  
21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
22 import org.apache.hadoop.util.Tool;
  
23 import org.apache.hadoop.util.ToolRunner;
  
24
  
25
  
26 /*
  
27 Hadoop内置的数据类型:
  
28     BooleanWritable:标准布尔型数值
  
29     ByteWritable:单字节数值
  
30     DoubleWritable:双字节数值
  
31     FloatWritable:浮点数
  
32     IntWritable:整型数
  
33     LongWritable:长整型数
  
34     Text:使用UTF8格式存储的文本
  
35     NullWritable:当<key, value>中的key或value为空时使用
  
36 */
  
37
  
38
  
39 /**
  
40  * 统计美国每个气象站30年来的平均气温
  
41  * 1、编写map()函数
  
42  * 2、编写reduce()函数
  
43  * 3、编写run()执行方法,负责运行MapReduce作业
  
44  * 4、在main()方法中运行程序
  
45  *
  
46  * @author zhouls
  
47  *
  
48  */
  
49                         //继承Configured类,实现Tool接口

  
50 public>
  
51     public static>  
52                                 //输入的key,输入的value,输出的key,输出的value
  
53         //输入的LongWritable键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。
  
54         
  
55 //        在这种情况下,我们将气象站id按 Text 对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。
  
56         
  
57         
  
58 //        map 函数的功能仅限于提取气象站和气温信息
  
59         
  
60         /**
  
61          * @function Mapper 解析气象站数据
  
62          * @input key=偏移量  value=气象站数据
  
63          * @output key=weatherStationId value=temperature
  
64          */
  
65         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
  
66                                 //map()函数还提供了context实例,用于键值对的输出  或者说 map() 方法还提供了 Context 实例用于输出内容的写入
  
67            
  
68 //                                就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。
  
69 //                同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似
  
70            
  
71            
  
72             //第一步,我们将每行气象站数据转换为每行的String类型
  
73             String line = value.toString(); //每行气象数据
  
74 //            values是1980 12 01 00    78   -17 10237   180    21     1     0     0
  
75 //            line是"1980 12 01 00    78   -17 10237   180    21     1     0     0"
  
76            
  
77            
  
78             //第二步:提取气温值
  
79             int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小时气温值
  
80                                  //需要转换为整形,截取第14位到19位,从第0位开始,trim()的功能是去掉首尾空格。
  
81                                 //substring()方法截取我们业务需要的值
  
82            
  
83 //            substring(start, stop)其内容是从 start 处到 stop-1 处的所有字符,其长度为 stop 减 start。
  
84            
  
85 //            如Hello world!    若是substring(3,7)        则是lo w
  
86            
  
87 //                                Integer.parseInt() 返回的是一个int的值。在这里, 给temperature
  
88            
  
89 //             new Integer.valueof()返回的是Integer的对象。
  
90 //             Integer.parseInt() 返回的是一个int的值。
  
91 //             new Integer.valueof().intValue();返回的也是一个int的值。
  
92            
  
93            
  
94            
  
95            
  
96            
  
97 //            1980 12 01 00    78   -17 10237   180    21     1     0     0
  
98                             //78是气温值
  
99            
  
100 //            temperature是78
  
101            
  
102 //            30yr_03103.dat
  
103 //            30yr_03812.dat
  
104 //            30yr_03813.dat
  
105 //            30yr_03816.dat
  
106 //            30yr_03820.dat
  
107 //            30yr_03822.dat
  
108 //            30yr_03856.dat
  
109 //            30yr_03860.dat
  
110 //            30yr_03870.dat
  
111 //            30yr_03872.dat
  
112            
  
113            
  
114 //            (0,1985 07 31 02   200    94 10137   220    26     1     0 -9999)
  
115 //            (62,1985 07 31 03   172    94 10142   240     0     0     0 -9999)
  
116 //            (124,1985 07 31 04   156    83 10148   260    10     0     0 -9999)
  
117 //            (186,1985 07 31 05   133    78 -9999   250     0 -9999     0 -9999)
  
118 //            (248,1985 07 31 06   122    72 -9999    90     0 -9999     0     0)
  
119 //            (310,1985 07 31 07   117    67 -9999    60     0 -9999     0 -9999)
  
120 //            (371,1985 07 31 08   111    61 -9999    90     0 -9999     0 -9999)
  
121 //            (434,1985 07 31 09   111    61 -9999    60     5 -9999     0 -9999)
  
122 //            (497,1985 07 31 10   106    67 -9999    80     0 -9999     0 -9999)
  
123 //            (560,1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999)
  
124            
  
125 //            (03103,[200,172,156,133,122,117,111,111,106,100])
  
126            
  
127 //            根据自己业务需要 , map 函数的功能仅限于提取气象站和气温信息
  
128            
  
129            
  
130 //            1998        #year
  
131 //            03            #month
  
132 //            09            #day
  
133 //            17            #hour
  
134 //            11            #temperature            感兴趣
  
135 //            -100        #dew
  
136 //            10237        #pressure
  
137 //            60            #wind_direction   
  
138 //            72            #wind_speed
  
139 //            0            #sky_condition   
  
140 //            0            #rain_1h
  
141 //            -9999        #rain_6h
  
142            
  
143            
  
144             if (temperature != -9999){//过滤无效数据   
  
145                 //第三步:提取气象站编号
  
146                 //获取输入分片
  
147                 FileSplit fileSplit = (FileSplit) context.getInputSplit();//提取问加你输入分片,并转换类型
  
148 //                    即由InputSplit   ->   FileSplit
  
149                 
  
150 //                context.getInputSplit()
  
151 //                (FileSplit) context.getInputSplit()这是强制转换
  
152 //                fileSplit的值是file:/D:/Code/MyEclipseJavaCode/myMapReduce/data/temperature/30yr_03870.dat:0+16357956
  
153 //                即,读的是30yr_03870.dat这个文件
  
154                 
  
155                 
  
156                 //然后通过文件名称提取气象站编号
  
157                 String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
  
158                         //首先通过文件分片fileSplit来获取文件路径,然后再获取文件名字,然后截取第5位到第10位就可以得到气象站 编号
  
159 //                        fileSplit.getPath()
  
160 //                        fileSplit.getPath().getName()
  
161                 
  
162 //                30yr_03870.dat   我们只需获取03870就是气象站编号
  
163
  
164 //                        fileSplit.getPath().getName().substring(5, 10)   //从0开始,即第5个开始截取,到第10个为止,第10个没有拿到,所以为03870
  
165 //                weatherStationId是03870
  
166                 
  
167                 
  
168                 
  
169                 context.write(new Text(weatherStationId), new IntWritable(temperature));//写入weatherStationId是k2,temperature是v2
  
170 //                context.write(weatherStationId,temperature);等价    ,但是若是直接这样写会出错,因为,    weatherStationId是String类型,注意与Text类型还是有区别的!        
  
171                         //气象站编号,气温值
  
172             }
  
173         }
  
174     }
  
175
  
176
  
177     

  
178     public static>  
179         private IntWritable result = new IntWritable();//存取结果
  
180                 //因为气温是IntWritable类型                       
  
181         public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
  
182 //            Iterable<IntWritable> values是iterable(迭代器)变量
  
183            
  
184            
  
185 //            Iterable<IntWritable> values和IntWritable values这样有什么区别?
  
186 //            前者是iterable(迭代器)变量,后者是intwriteable(int的封装)变量
  
187            
  
188            
  
189 //            Iterable<IntWritable> values
  
190 //            迭代器,valuses是 iterable(迭代器)变量,类型是IntWritable
  
191            
  
192            
  
193                         //reduce输出的key,key的集合,context的实例
  
194             //第一步:统计相同气象站的所有气温
  
195             int sum = 0;
  
196             int count = 0;
  
197             for (IntWritable val : values) //星型for循环来循环同一个气象站的所有气温值,即将values的值一一传给IntWritable val
  
198 //                IntWritable val是IntWritable(int的封装)变量
  
199                 
  
200             {//对所有气温值累加
  
201             sum += val.get();//去val里拿一个值,就sum下
  
202
  
203 //            val.get()去拿值
  
204            
  
205                 count++;
  
206             }
  
207             result.set(sum / count);//设为v3
  
208 //            result.set(sum / count)去设置,将sum / count的值,设给result
  
209 //            sum是21299119   count是258616  =  82.3580869  
  
210            
  
211            
  
212             context.write(key,result);//写入key是k3,result是v3
  
213         }
  
214     }
  
215
  
216     
  
217     
  
218     public int run(String[] args) throws Exception{
  
219         // TODO Auto-generated method stub
  
220         //第一步:读取配置文件
  
221         Configuration conf = new Configuration();//程序里,只需写这么一句话,就会加载到hadoop的配置文件了
  
222         //Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。
  
223         
  
224 //                new Configuration()
  
225
  
226         //第二步:输出路径存在就先删除
  
227         Path mypath = new Path(args[1]);//定义输出路径的Path对象,mypath
  
228         
  
229         
  
230 //        new Path(args[1])将args[1]的值,给mypath
  
231         
  
232         FileSystem hdfs = mypath.getFileSystem(conf);//程序里,只需写这么一句话,就可以获取到文件系统了。
  
233         //FileSystem里面包括很多系统,不局限于hdfs,是因为,程序读到conf,哦,原来是hadoop集群啊。这时,才认知到是hdfs
  
234         
  
235         if (hdfs.isDirectory(mypath))//如果输出路径存在
  
236         {
  
237             hdfs.delete(mypath, true);//则就删除
  
238         }
  
239         //第三步:构建job对象
  
240         Job job = new Job(conf, "temperature");//新建一个任务,job名字是tempreature
  
241         
  
242 //        new Job(conf, "temperature")有这么个构造方法
  
243         
  
244         job.setJarByClass(Temperature.class);// 设置主类
  
245         //通过job对象来设置主类Temperature.class
  
246         
  
247         //第四步:指定数据的输入路径和输出路径
  
248         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径,args[0]
  
249         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径,args[1]
  
250         
  
251         //第五步:指定Mapper和Reducer
  
252         job.setMapperClass(TemperatureMapper.class);// Mapper
  
253         job.setReducerClass(TemperatureReducer.class);// Reducer
  
254         
  
255         //第六步:设置map函数和reducer函数的输出类型
  
256         job.setOutputKeyClass(Text.class);
  
257         job.setOutputValueClass(IntWritable.class);   
  
258         
  
259         //第七步:提交作业
  
260         return job.waitForCompletion(true)?0:1;//提交任务
  
261     }
  
262
  
263
  
264     /**
  
265      * @function main 方法
  
266      * @param args
  
267      * @throws Exception
  
268      */
  
269     public static void main(String[] args) throws Exception {
  
270         //第一步
  
271 //        String[] args0 =
  
272 //            {
  
273 //            "hdfs://djt002:9000/inputData/temperature/",
  
274 //                    "hdfs://djt002:9000/outData/temperature/"
  
275 //            };
  
276         
  
277         String[] args0 = {"./data/temperature/","./out/temperature/"};
  
278         
  
279 //        args0是输入路径和输出路径的属组
  
280         
  
281         //第二步
  
282         int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
  
283         
  
284 //        ToolRunner.run(new Configuration(), new Temperature(), args0)有这么一个构造方法
  
285         
  
286         //第一个参数是读取配置文件,第二个参数是主类Temperature,第三个参数是输入路径和输出路径的属组
  
287         System.exit(ec);
  
288     }
  
289
  
290 }
  
291

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425381-1-1.html 上篇帖子: Hadoop HDFS编程 API入门系列之简单综合版本1(四) 下篇帖子: Ubuntu16.04 下 hadoop的安装与配置(伪分布式环境)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表