设为首页 收藏本站
查看: 918|回复: 0

[经验分享] Hadoop学习笔记(二):MapReduce的特性-计数器、排序

[复制链接]

尚未签到

发表于 2015-7-13 09:49:36 | 显示全部楼层 |阅读模式
    计数器
  计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。说白了就是统计整个mr作业所有数据行中符合某个if条件的数量,(除某些内置计数器之外)。仅当一个作业执行成功之后,计数器的值才是完整可靠的。如果一个任务在作业执行期间失败,则相关计数器值会减小,计数器是全局的。
  计数器分为以下几种:  
  1)内置计数器,内置的作业计数器实际上由jobtracker维护,而不必在整个网络中发送;
  2)用户自定义的java计数器,由其关联任务维护,并定期传到tasktracker,再由tasktracker传给jobtracker,可以定义多个枚举类型,每个枚举类型有多个字段,枚举类型名称即为组名,枚举字段名称即为计数器名称。
  Reporter对象的incrCounter()方法重载: public void incrCounter(enum,long amout)
  3)动态计数器,不由java枚举类型定义的计数器,由于在编译阶段就已指定java枚举类型的字段,故无法使用枚举类型动态新建计数器。
  Reporter对象的incrCounter()方法重载: public void incrCounter(String group,String counter,long amout)
  自定义java计数器由于使用枚举类型,默认名称为(枚举名称)$(枚举字段名称),可读性较差,可以通过设置一个属性文件将计数器重命名。属性文件名称为(使用该计数器的类名)_(组名).properties,文件内容见下面的代码吧,不好描述了~
  代码中使用了自定义java计数器和动态计数器:



import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import common.JobBuilder;
import common.NcdcRecordParser;
public class TestTemperatureWithCounters extends Configured implements Tool {
enum Temperature {
MISSING, MALFORMED, VALIDATED
}
static class MaxTemperatureMapperWithCounters extends MapReduceBase
implements Mapper {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
output.collect(new Text(parser.getYear()), new IntWritable(
airTemperature));
reporter.incrCounter(Temperature.VALIDATED, 1);
} else if (parser.isMalformedTemperature()) {
reporter.incrCounter(Temperature.MALFORMED, 1);
} else if (parser.isMissingTemperature()) {
reporter.incrCounter(Temperature.MISSING, 1);
}
// dynarnic counter
reporter.incrCounter("TemperatureCountByYear", parser.getYear(), 1);
}
}
static class MaxTemperatureReducer extends MapReduceBase implements
Reducer {
@Override
public void reduce(Text key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MaxTemperatureMapperWithCounters.class);
conf.setReducerClass(MaxTemperatureReducer.class);
JobClient.runJob(conf);
return 0;
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new TestTemperatureWithCounters(), args);
System.exit(exitCode);
//args
//hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/testCounters0006
    }
}
  这里提一句,在写的时候犯了一个低级错误:由于在帮助类NcdcRecordParser中开始是将boolean isMalformed=false; 变量声明处直接设置初始值,而不是在parse方法中设置为false。而在引用的MaxTemperatureMapperWithCounters类中却是在map函数之外实例化NcdcRecordParser的,导致当第一次isMalformed赋值为true之后,值一直为true,(由于同一个tasktracker调用的同一个map作业中,只会实例化一次Map类,为输入分片中每条记录调用一次map函数)马虎大意了~ 记录一下。
  下面贴上控制台输出的计数器结果截图:(未使用配置文件重命名自定义java计数器)
DSC0000.jpg
  重命名自定义java计数器配置文件:(名称为TestTemperatureWithCounters_Temperature.properties)



CounterGroupName= Air Temperature Records
MISSING.name=Missing
MALFORMED.name=Malformed
VALIDATED.name=Validated
  使用配置文件重命名自定义java计数器截图:
DSC0001.jpg
  
  排序
  个人感觉排序是整个MR作业中比较重要的一块,所以需要反复研究,透彻研究!
  部分排序:
  MapReduce的每个reducer的输入都是按键排序的。系统执行排序的过程-将Map的输出作为输入传递给reducer---称为shuffle。
  整个排序过程分为map端操作和reduce端操作;
  1)在map端:map函数的输出将利用缓冲的方式写到内存,并进行预排序。在这个过程中,如果map的输出大于环形内存缓冲区的阀值(缓冲区默认大小100M,阀值默认为80%),则会将写入溢出文件。当输入分片执行map函数全部结束,即写入内存和溢出文件结束后,将执行patitioner(这个是重点。。。。。。)
  patitioner执行后会将输出合并成一个已分区且已排序的输出文件,这是排序操作的第一步。(在这个输出文件写入到本地磁盘之前,将执行combiner操作和压缩操作,ps:如果有这些操作的话~)
  这是map端的排序了。这样可以保证每个map到所有reduce的分区都是安key值已排序的。那么reduce接收的将是多个已排序的输入分区数据。(ps:并不是连在一起整个排好序的文件了~) 比如:假设存在map01,map02,reduce01,map01输出给reduce01的分区数据key值为01,03,05;map02输出给reduce01的分区数据key值位02,04,06。
  2)在reduce端:map端输出文件位于运行map任务的tasktracker的本地磁盘,reduce端通过线程定期询问jobtracker以获得map输出的位置。复制这些map端输出文件到reduce端,并执行合并操作。将已排序的map端输出合并为一个或者多个合并后文件作为reduce操作的输入。(合并后的文件依然是排好序的;合并操作次数由io.sort.factor属性控制)
  这里面有几个地方需要注意:1) map函数和reduce函数应该尽量少用内存,避免在map中堆积数据(需要留给shuffle~)
  2) 避免map端多次溢出写磁盘,来获得最佳性能(估算map操作输出的大小,然后通过合理设置各项属性~)
  全排序:
  部分排序的reduce输入是有序的,但是多个reduce之间并不是一个接一个的有序的。所以reduce的输出是不能拼接起来有序的。(除非把reduce个数设置为1,但是这样就体现不出分布式的优势了~)
  为了达到全排序的效果,需要自定义map操作输出时候的partion操作,让key值不同区间的数据落到不同区间,这样就保证了每个reduce的输入数据集和其他的reduce输入数据集连接起来是有序的。就实现了全排序的效果!比如:让key值属于[0,10)的数据非配到一个partition,[10,20)的数据分配到一个partition,以此类推。这样将reduce的输出文件直接拼接就能得到一个完整有序的最终全排序的结果了!
  但是这里有个问题,由于数据是不规律的,向上面提到的区间[0,10)[10,20)这样划分区间肯定无法保证每个区间中的数据是均匀分布的,这样整个MR操作的时间将由最多数据的那个reduce决定,这样肯定是不可取的。为了达到这个目的,有一个取样器的概念。所谓取样器就是按一定规则从整个大数据集中取样出一小部分数据来观察大致的分布用来划分区间。以达到分区后每个reduce的输入能大概均匀分布的目的。
  上代码:



import java.net.URI;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import common.JobBuilder;
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool {
@Override
public int run(String[] args) throws Exception {
JobConf conf = JobBuilder.parseInputAndOutput(this, this.getConf(),
args);
if (conf == null) {
return -1;
}
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
//        SequenceFileOutputFormat.setCompressOutput(conf, true);
//        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
//        SequenceFileOutputFormat.setOutputCompressionType(conf,CompressionType.BLOCK);

conf.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler sampler = new InputSampler.RandomSampler(
0.1, 10000, 10);
Path input = FileInputFormat.getInputPaths(conf)[0];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input, "_partitions");
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler);
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf);
return 0;
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
//hdfs://192.168.174.128:9000/user/root/input/Temperature-seq -D mapred.reduce.tasks=3 hdfs://192.168.174.128:9000/user/root/output/Temperature-totalsort
    }
}
  
  由于我现在还是在伪分布式环境开发,只能有一个map一个reduce 这里设置tasks=3会报错了,而且本地由于只有一个reduce,其实这个所谓的全排序在我这里是看不到啥实际意义了。等过段时间装个分布式环境吧。
  其中 writePartitionFile(JobConf job, InputSampler.Sampler sampler)这个函数是将采样的结果排序,然后按照分区的个数n,将排序后的结果平均分为n分,取n-1个分割点,这个分割点具体取的时候,运用了一些4舍5入的方法,最简答的理解就是取后n-1个组中每组的第一个采样值就OK了。
  这块需要再研究研究~
   辅助排序
  辅助排序使用到的几个关键函数:


    setPartitionerClass(class);// 自定义分区

    setOutputValueGroupingComparator(class);//提供分组规则,使得不同 Key的中间数据,能够被分发到同一个reduce 处理,在下面的例子中即是将同一年份数据分发到同一reduce ;这里设置group,就是按key的哪一个参数进行聚合,

    setOutputKeyComparatorClass(class);//中间数据在被reduce处理之前,会先被集中排序,setOutputKeyComparatorClass提供排序规则,在下面的例子中即是将同一reduce接收到的数据以年份升序,温度降序排序;这个是设置key的比较器,设置聚合的key的一个二次排序方法      代码中:key为组合(年份,温度) value为nullwritable
     



import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import common.IntPair;
import common.JobBuilder;
import common.NcdcRecordParser;
public class MaxTemperatureUsingSecondarySort extends Configured implements
Tool {
static class MaxTemperatureMapper extends MapReduceBase implements
Mapper {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
parser.parse(value);
if (parser.isValidTemperature()) {
output.collect(
new IntPair(parser.getYearInt(), parser
.getAirTemperature()), NullWritable.get());
}
}
}
static class MaxTemperatureReducer extends MapReduceBase implements
Reducer {
@Override
public void reduce(IntPair key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
output.collect(key, NullWritable.get());
}
}
public static class FirstPartitioner implements
Partitioner {
@Override
public void configure(JobConf conf) {
}
@Override
public int getPartition(IntPair key, NullWritable value,
int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int cmp = Integer.compare(ip1.getFirst(), ip2.getFirst());
if (cmp != 0) {
return cmp;
}
return -Integer.compare(ip1.getSecond(), ip2.getSecond());
}
}
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return Integer.compare(ip1.getFirst(), ip2.getFirst());
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null)
return -1;
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setPartitionerClass(FirstPartitioner.class);
conf.setOutputKeyComparatorClass(KeyComparator.class);
conf.setOutputValueGroupingComparator(GroupComparator.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(IntPair.class);
conf.setOutputValueClass(NullWritable.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(),
args);
System.exit(exitCode);
}
}
  
  这里有些地方不大理解,如果使用默认reduce的情况下(注释掉conf.setReducerClass这一行)不理解为啥所有记录的key都成了温度最大的那条记录的key,即同一年份第一条记录的key,
  如果继续使用默认的OutputValueGroupingComparator(由于现在只有一个reduce,故注释掉不会影响现有逻辑,注释掉conf.setOutputValueGroupingComparator这一行),才会是所有记录年份升序,温度降序排列的结果。
  需要在分布式上执行多个map,reduce的时候试一下,到时候需要再研究研究。
  (之前看到别人的博客上有提到过好像是 reduce如果判断为key值相等的情况下,则会只使用第一条记录的key,不会再去取之后记录的key,好像是这个原因了,我试验了下将GroupComparator.class中的逻辑改成和KeyComparator.class中逻辑一致,将会输出所有记录去掉重复行后的年份升序,温度降序排列的结果)                 好像又不是这么理解的,暂时留着吧,有时间继续查资料了~
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86154-1-1.html 上篇帖子: 第五章 MyEclipse配置hadoop开发环境 下篇帖子: Hadoop下进行反向索引(Inverted Index)操作
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表