设为首页 收藏本站
查看: 925|回复: 0

[经验分享] Hadoop:The Definitive Guid 总结 Chapter 5 MapReduce应用开发

[复制链接]

尚未签到

发表于 2015-7-13 09:27:24 | 显示全部楼层 |阅读模式
  用MapReduce来编写程序,有几个主要的特定流程,首先写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期,然后,写一个驱动程序来运行作业,要看这个驱动程序是否可以运行,之后利用本地IDE调试,修改程序
  实际上权威指南的一些配置已经过时 所以这里很多地方不做介绍
  
  1.配置API
  Hadoop拥有很多xml配置文件,格式遵从一般xml的要求 见实例







color
yellow
Color


size
10
Size


weight
heavy
true
Weight


size-weight
${size},${weight}
Size and weight


  访问属性的方法:



Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
assertThat(conf.get("color"), is("yellow"));
assertThat(conf.getInt("size", 0), is(10));
assertThat(conf.get("breadth", "wide"), is("wide"));
  
Hadoop允许多个配置文件进行合并:







size
12


weight
light


  源文件按顺序填到Configuration:



Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
conf.addResource("configuration-2.xml");
  后来添加到源文件的属性会覆盖之前定义的属性,另外在上面的配置文件中,如果覆盖设置fina为true的property,则会出现配置错误,标记final为true的属性说明不希望客户端更改这个属性
  关于可变的扩展:配置属性可以用其他属性或系统属性进行定义,而且系统属性的优先级高于源文件中定义的属性:



System.setProperty("size", "14");
assertThat(conf.get("size-weight"), is("14,heavy"));
  该特性用于使用JVM参数-Dproperty=value来覆盖命令方式下的属性
  
  
  2.配置开发环境
  1).配置管理
  权威指南给出了示例,实际上hadoop官方网站更具有权威性 如欲了解Hadoop2.0的配置参考示例请见:http://hadoop.apache.org/common/docs/r2.0.0-alpha/
  2).辅助类GenericOptionsParser, Tool和ToolRunner
  Hadoop提供了辅助类,GenericOptionsParser:用来解释常用的Hadoop命令选项,但是一般更常用的方式:实现Tool接口,通过ToolsRunner来运行程序,ToolRunner内部调用GenericOptionsParser
  Tool实现示例用于打印一个Configuration对象的属性:



public interface Tool extends Configurable {
int run(String[] args) throws Exception;
}


public class ConfigurationPrinter extends Configured implements Tool {
static {
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
for (Entry entry : conf) {
System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
}
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
System.exit(exitCode);
}
}
  
在Hadoop中 -D选项可以把默认属性放入配置文件中,然后在需要时,用-D选项来覆盖它们,注意的是,这个不同于JVM系统属性设置Java命令 -Dproperty=value,JVM中的-D与属性没有空格
  
  下面给出GenericOptionsParser选项和ToolRunner选项
DSC0000.jpg
  
  3).编写单元测试
  以下程序可以在IDE Eclipse中运行
  mapper的测试实例:



import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*;
public class MaxTemperatureMapperTest {
@Test
public void processesValidRecord() throws IOException, InterruptedException {
Text value = new Text(
"0043011990999991950051518004+68750+023550FM-12+0382" +
// Year ^^^^
"99999V0203201N00261220001CN9999999N9-00111+99999999999");
// Temperature ^^^^^
new MapDriver()
.withMapper(new MaxTemperatureMapper()).withInputValue(value)
.withOutput(new Text("1950"), new IntWritable(-11)).runTest();
}
}
  
最终的Mapper函数:



public class MaxTemperatureMapper extends
Mapper {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
String temp = line.substring(87, 92);
if (!missing(temp)) {
int airTemperature = Integer.parseInt(temp);
context.write(new Text(year), new IntWritable(airTemperature));
}
}
private boolean missing(String temp) {
return temp.equals("+9999");
}
}
  
reducer的测试函数



import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*;
public class MaxTemperatureMapperTest {
@Test
public void returnsMaximumIntegerInValues() throws IOException,
InterruptedException {
new ReduceDriver()
.withReducer(new MaxTemperatureReducer())
.withInputKey(new Text("1950"))
.withInputValues(
Arrays.asList(new IntWritable(10), new IntWritable(5)))
.withOutput(new Text("1950"), new IntWritable(10)).runTest();
}
}
  
  最后的reducer函数实现



public class MaxTemperatureReducer extends
Reducer {
@Override
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
  
  
  3.本地运行测试数据
  1).本地运行Job
  Job驱动程序查找最高气温



public class MaxTemperatureDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options]  \n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
        FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
System.exit(exitCode);
}
}
  命令运行驱动程序:

  % mvn compile
% export HADOOP_CLASSPATH=target/classes/
% hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml input/ncdc/micro output

  这里给出权威指南上的parse函数



public class NcdcRecordParser {
private static final int MISSING_TEMPERATURE = 9999;
private String year;
private int airTemperature;
private String quality;
public void parse(String record) {
year = record.substring(15, 19);
String airTemperatureString;
// Remove leading plus sign as parseInt doesn't like them
if (record.charAt(87) == '+') {
airTemperatureString = record.substring(88, 92);
} else {
airTemperatureString = record.substring(87, 92);
}
airTemperature = Integer.parseInt(airTemperatureString);
quality = record.substring(92, 93);
}
public void parse(Text record) {
parse(record.toString());
}
public boolean isValidTemperature() {
return airTemperature != MISSING_TEMPERATURE
&& quality.matches("[01459]");
}
public String getYear() {
return year;
}
public int getAirTemperature() {
return airTemperature;
}
}
  利用上面的parser函数mapper函数可以写成下面形式



public class MaxTemperatureMapper extends
Mapper {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
context.write(new Text(parser.getYear()),
new IntWritable(parser.getAirTemperature()));
}
}
}
  
  
  
2).测试驱动程序
  需要关注的是 在下面程序中,checkOutput()方法被调用用以逐行对比实际输出与与其输出



@Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
Path input = new Path("input/ncdc/micro");
Path output = new Path("output");
FileSystem fs = FileSystem.getLocal(conf);
fs.delete(output, true); // delete old output
MaxTemperatureDriver driver = new MaxTemperatureDriver();
driver.setConf(conf);
int exitCode = driver.run(new String[] {
input.toString(), output.toString() });
assertThat(exitCode, is(0));
checkOutput(conf, output);
}
  
  
  4.集群上的运行
  以下会列出一些命令 但是最好还是参照Hadoop官方网站为佳
  1).打包
  新版的Hadoop 2.0用mvn对Hadoop进行打包 其实也可以用Eclipse打包 两者方法在实际中都可以,mav命令:

  % mvn package -DskipTests

  配置打包过程中注意对HADOOP_CLASSPATH的设置,和依赖包的导入等 详见上面 Hadoop官方网站
  
  2).Job的启动
  Job类中的waitForCompletion()启动Job并轮询检查Job的运行进程
  
  3)Job、Task和Task Attempt ID
  Job的ID一般来源本地时间 例如:job_200904110811_0002(0002,Job的ID从1开始)
  Task隶属于Job 所以Task的ID是以Job的ID为前缀,然后加上一个后缀,表示Job下的哪一个Task,例如:task_200904110811_0002_m_000003(000003,Task的ID从0开始)
  Task Attempt是由Task的生成 自然Task AttemptID的前缀为Task的ID,之后加上后缀,后面表示表示失败后尝试的次数,例如:attempt_200904110811_0002_m_000003_0(0,Task Attempt的ID从0开始)
  
  3).MapReduce的Web页面
  因为Hadoop经过改版一些web的页面的URL也不断变化,所以这个需要参照Hadoop的网站为佳
  
  4).获取结果
  hadoop fs 命令中的-getmerge,可以得到源模式目录中的所有文件,并在本地系统上将它们合并成一个文件,实例如下:


% hadoop fs -getmerge max-temp max-temp-local
% sort max-temp-local | tail
1991           607
1992           605
1993           567
1994           568
1995           567
1996           561
1997           565
1998           568
1999           568
2000           558
  
  5).作业调试
  可以利用Hadoop输出的log文件和一些其他信息(例如计数器等工具),进行调试,并用web页面查看调试后的结果
  关于远程调试器:可以用JVM选项,Java profiling够工具,IsolationRunner工具还有,如果为了监视失败作业的情况,可以设置keep.failed.task.files为true
  
  
  5.作业调优
  作业调优表:
DSC0001.jpg
DSC0002.jpg
  对Job程序的修改可以启用HPROF工具,另外也有其他分析工具帮助调优,例如:DistributedCache等等
  
  
  6.MapReduce的工作流
  1).将问题分解成MapReduce作业
  需要注意的是:对于十分复杂的问题 可以使用Hadoop自带ChainMapper类库将它们连接成一个Mapper,结合使用ChainReducer,这样就可以在一个MapReduce作业中运行一系列的mapper,再运行一个reducer和另一个mapper链。
  
  2).运行独立的Job
  管理作业的执行顺序。其中主要考虑的是:是否有一个线性的作业链或一个更复杂的作业有向无环图(DAG)
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86097-1-1.html 上篇帖子: hadoop配置文件说明 下篇帖子: 一个4节点Hadoop集群的配置示例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表