设为首页 收藏本站
查看: 925|回复: 0

[经验分享] Hadoop:The Definitive Guid 总结 Chapter 7 MapReduce的类型与格式

[复制链接]

尚未签到

发表于 2015-7-11 08:04:25 | 显示全部楼层 |阅读模式
  MapReduce数据处理模型非常简单:map和reduce函数的输入和输出是键/值对(key/value pair)
  
  1.MapReduce的类型
  Hadoop的MapReduce一般遵循如下常规格式:

  map(K1, V1) –> list (K2, V2)              
  combine(K2, list(V2)) –> list(K2, V2)
  partition(K2, V2) –> integer              
  reduce(K2, list(V2)) –> list(K3, V3)

  map:对数据进行抽去过滤数据,组织key/value对等操作.
  combine:为了减少reduce的输入和Hadoop内部网络数据传输负载,需要在map端对输出进行预处理,类似reduce。combine不一定适用任何情况,选用
  partition:将中间键值对划分到一个reduce分区,返回分区索引号。实际上,分区单独由键决定(值是被忽略的),分区内的键会排序,相同的键的所有值会合成一个组(list(V2))
  reduce:每个reduce会处理具有某些特性的键,每个键上都有值的序列,是通过对所有map输出的值进行统计得来的,reduce根据所有map传来的结果,最后进行统计合并操作,并输出结果。
  注:combine与reduce一样时,K3与K2相同,V3与V2相同。
  
  MapReduce的Java API代码:一般Combine函数与reduce的一样的



public class Mapper {
public class Context extends MapContext {
// ...
    }
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
// ...
    }
}
public class Reducer {
public class Context extends
ReducerContext {
// ...
    }
protected void reduce(KEYIN key, Iterable values, Context context)
throws IOException, InterruptedException {
// ...
    }
}
  用于处理中间数据的partition函数 API代码:



public abstract class Partitioner {
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
  
  关于默认的MapReduce作业
  默认的map是Mapper,是一个泛型类型,简单的将所有输入的值和键原封不动的写到输出中,即输入输出类型相同。
  Mapper的实现



public class Mapper {
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
  
  默认的 partitioner是HashPartitioner,对每天记录的键进行哈希操作以决定该记录属于那个分区让reduce处理,每个分区对应一个reducer任务,所以分区数等于Job的reduce的个数
  HashPartitioner的实现



public class HashPartitioner extends Partitioner {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
  
  默认的reduce函数Reducer,也是泛型类型,简单的将所有输入写到输出中。记录在发给reduce之前,会被排序,一般是按照键值的大小排序。reduce的默认输出格式是TextOutputFormat----它将键和值转换成字符串并用Tab进行分割,然后一条记录一行地进行输出。
  Reducer 的实现



public class Reducer {
protected void reduce(KEYIN key, Iterable values, Context context)
throws IOException, InterruptedException {
for (VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
}
  选择reduce的个数:一般集群的总共的slot个数等于node的数目乘以每个node上的slot数目,而reduce的数目一般设置为比总slot数目少一些
  默认MapReduce函数实例程序



public class MinimalMapReduceWithDefaults extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Mapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
System.exit(exitCode);
}
}
  

  
  关于默认的stream作业(Stream概念见第二章)
  stream最简单的形式:

  % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
     -input input/ncdc/sample.txt \
     -output output \
     -mapper /bin/cat

  注意,必须提供一个mappe:默认的identity mapp不能在stream工作
  这里再给出更多设置的stream形式,其他详见权威指南:

  % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
    -input input/ncdc/sample.txt \
    -output output \
    -inputformat org.apache.hadoop.mapred.TextInputFormat \
    -mapper /bin/cat \
    -partitioner org.apache.hadoop.mapred.lib.HashPartitioner \
    -numReduceTasks 1 \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -outputformat org.apache.hadoop.mapred.TextOutputFormat

  
  关于Streaming中的键和值
  Streaming用分隔符用于通过标准输入把key/value对转换为一串比特发送到map或reduce函数
  默认时,用Tab分隔符,也可以根据需要,用配置的分隔符来进行分割,例如:来自输出的键可以由一条记录的前n个字段组成(stream.num.map.output.key.fields或stream.num.reduce.output.key.fields定义),剩下的就是值,eg,输出的是"a,b,c",n=2,则键为"a、b",而值是"c"。Map和Reduce的分隔符是相互独立进行配置的,参见下图
DSC0000.jpg
DSC0001.jpg
  
  
  2.输入格式
  1).输入分片与记录
  一个输入分片(input split)是由单个map处理的输入块,即每一个map只处理一个输入分片,每个分片被划分为若干个记录(records),每条记录就是一个key/value对,map一个接一个的处理每条记录,输入分片和记录都是逻辑的,不必将他们对应到文件上。注意,一个分片不包含数据本身,而是指向数据的引用和。
  输入分片在Java中被表示为InputSplit借口



public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException;
public abstract String[] getLocations() throws IOException,InterruptedException;
}
  InputFormat负责创建输入分片并将它们分割成记录,下面就是原型用法:



public abstract class InputFormat {
public abstract List getSplits(JobContext context)
throws IOException, InterruptedException;
public abstract RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
  
  客户端通过调用getSpilts()方法获得分片数目,在TaskTracker或NM上,MapTask会将分片信息传给InputFormat的createRecordReader()方法,进而这个方法来获得这个分片的RecordReader,RecordReader基本就是记录上的迭代器,MapTask用一个RecordReader来生成记录的key/value对,然后再传递给map函数,如下代码



public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
  此处的Context实现接口MapContextImpl,并且封装调用RecordReader下面的经过实现的方法,包括nextKeyValue,getCurrentKey,getCurrentValue。nextKeyValue()方法反复被调用用来为mapper生成key/value对,然后把这些key/value传递给map()方法,直到独到stream的末尾,此时nextKeyValue返回false
  
  A.FileInputFormat类
  FileInputFormat是所有使用文件为数据源的InputFormat实现的基类,它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现,把分片割成记录的作业由其子类来完成。
  下图为InputFormat类的层次结构:
DSC0002.jpg
  
  B.FileInputFormat类输入路径
  FileInputFormat提供四种静态方法来设定Job的输入路径,其中下面的addInputPath()方法addInputPaths()方法可以将一个或多个路径加入路径列表,setInputPaths()方法一次设定完整的路径列表(可以替换前面所设路径)



public static void addInputPath(Job job, Path path);
public static void addInputPaths(Job job, String commaSeparatedPaths);
public static void setInputPaths(Job job, Path... inputPaths);
public static void setInputPaths(Job job, String commaSeparatedPaths);
  如果需要排除特定文件,可以使用FileInputFormat的setInputPathFilter()设置一个过滤器:



public static void setInputPathFilter(Job job, Class

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85326-1-1.html 上篇帖子: Ganglia监控Hadoop及Hbase集群性能(安装配置) 下篇帖子: 【hadoop代码笔记】Mapreduce shuffle过程之Map输出过程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表