设为首页 收藏本站
查看: 575|回复: 0

[经验分享] Hadoop应用系列2--MapReduce原理浅析(下)

[复制链接]

尚未签到

发表于 2016-12-11 08:45:50 | 显示全部楼层 |阅读模式
上面我们分析气温的那段程序,看起没有问题,用起来也没有问题。
试想一下,如果我们把全球所有气象站的数据拿来分析, 你的程序大约需要多久能计算出结果?或者说能否完成运算?
以前我们会把来自不同气象站的数据在不同时间,或者不同计算机上进行运算,最后把结果拿来,再次执行运算。
有了MapReduce咱就不用这么麻烦了,MapReduce做了这样几件事:
0、分布式并行
1、他把输入和输出分开。Mapper负责读取数据,把需要计算的数据输出给Reducer,
也就是说,我们刚才写的程序addYearAndTemperature和out他们是天然的2个程序。
2、本身就是为分布式而设计的,他会把来自多个输入(map)的结果自动的合并并输出(reduce)
3、总是读取本地数据进行运算(相当于我们让不同的计算机分析来自不同气象站的数据)
不知道我说清楚了没有,如果有不清楚的地方欢迎讨论和拍砖。
下面我们来使用hadoop来编写MapReduce程序,来完成同样的功能,注意Mapper是Reducer的输入哦。
我们先来编写Mapper吧,就是把年份和气温取出的程序。

package org.pcwl.mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* <p><b>用于读取气象信息,从气象信息中分解出年份和气温的程序。</b></p>
* <p>
* MapReduce的mapper需要继承Mapper<T1,T2,T3,T4>类
* 重写map(T1,T2,T3,T4)方法
* </p>
* <p>
* T1表示mapper的输入key类型,T2表示mapper输入的value类型,我们可以把他理解为
* <em>hadoop的文件读取其,把数据读进来之后装入到一个java.util.Map<K,V>中,
* 我们从这个java.util.Map<K,V>中读取数据, 这里T1就是这个K的类型.T2就是V的类型</em>
* <i>
* 注意:输入到mapper中的数据是以行为单位的(\n) 你懂的。
*  value是当前行,key可以看作是行号
* </i>
* </p>
* <p>
* T3,T4表示mapper的输出类型,也是Reducer的输入类型,我们可以理解为<em>
*  mapper程序把需要reducer计算的数据写入一个java.util.Map<K,V>中,
*  T3,表示这个K的类型,T4表示这个V的类型。
* </em>
* </p>
* <p>
* <strong>
* 注意:这里类型不能使用java自身的类型,
*  我们使用<i>org.apache.hadoop.io</i>这个包下的类型
*  ,他们是为分布式并行而设计的类型
*  LongWritable---视为java.lang.Long的替代类型
*  Text---视为java.lang.String的替代类型
*  IntWritable---视为java.lang.Integer的替代类型
*  </strong>
* </p>
* @author project_maker@sina.com;eastzhang.iyunv.com
*
*/
public class Temperature  extends Mapper<LongWritable, Text, Text, IntWritable> {
/* 已知的错误数据值 */
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/* 从value取出值,得到String */
String line = value.toString();
/* 从字符串中提出年份信息 */
String year = line.substring(15, 19);
int temperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
temperature = Integer.parseInt(line.substring(88, 92));
} else {
temperature = Integer.parseInt(line.substring(87, 92));
}
if (temperature != MISSING) {
/* 把合法数据送入到reducer */
context.write(new Text(year), new IntWritable(temperature));
}
}
}


到此我的mapper编写完成,记住它相当于 addYearAndTemperature 这个哦。
接下来我们写 Reducer , 它想当于out

package org.pcwl.reducer;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* <p>Reducer 程序需要继承<code>org.apache.hadoop.mapreduce.Reducer<T1,T2,T3,T4></code>
* 重写reduce<T1,Iterable<T2>,T3,T4>方法。
* <p>
* <p>
* 这里T1,T2 是 Reducer的输入类型,也是Mapper的输出类型,
*  <b>注意map的输出和reduce的输入必须一致哟.<b>
*  T3,T4是Reducer的输出类型,也就是写入文件中数据类型。
*  根据你程序要写入文件的结果确定类型吧。
* </p>
* @author project_maker@sina.com;eastzhang.iyunv.com
*
*/
public class Temperature
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
/**
* <strong>注意这里values是个迭代器</strong>也就是说,Reducer是对一个集合进行处理,
* 这一点在@see http://eastzhang.iyunv.com/blog/1775734有体现
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
/* 要计算最高气温,我们来个无穷小,作为比较量 */
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
/* key 是 年份, 我们把结果写入到文件中 */
context.write(key, new IntWritable(maxValue));
}
}

到此, 我们Reducer程序编写完成。
让MapReduce程序运行, 我们需要些调度器(job)作业。

package org.pcwl.job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 运行MapReduce的程序(job)作业
* @Author project_maker@sina.com; eastzhang@iyunv.com
*/
public class Temperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("请输入数据源文件夹<以存在>和目标文件夹<不存在>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(Temperature.class);
job.setJobName("计算最高气温");
/* 指定从何出读取数据 */
FileInputFormat.addInputPath(job, new Path(args[0]));
/* 指定结果写入何处,注意文件夹必须不存在 */
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/* 设置Mapper程序 */
job.setMapperClass(org.pcwl.mapper.Temperature.class);
/* 设置Reducer程序 */
job.setReducerClass(org.pcwl.reducer.Temperature.class);
/* 指定输出结果类型 */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


到此我们这个MapReduce就搞定了, 打jar包运行它
DSC0000.png
DSC0001.png
DSC0002.png
DSC0003.png
下一篇我们将要研究一下Combiner

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312564-1-1.html 上篇帖子: 基于Hadoop系统的MapReduce数据流优化 下篇帖子: Hadoop源代码分析(*IDs类和*Context类)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表