设为首页 收藏本站
查看: 731|回复: 0

[经验分享] 如何使用hadoop对海量数据进行统计并排序

[复制链接]

尚未签到

发表于 2016-12-12 08:52:51 | 显示全部楼层 |阅读模式
不得不说,Hadoop确实是处理海量离线数据的利器,当然,凡是一个东西有优点必定也有缺点,hadoop的缺点也很多,比如对流式计算,实时计算,DAG具有依赖关系的计算,支持都不友好,所以,由此诞生了很多新的分布式计算框架,Storm,Spark,Tez,impala,drill,等等,他们都是针对特定问题提出一种解决方案,新框架的的兴起,并不意味者他们就可以替代hadoop,一手独大,HDFS和MapReduce依旧是很优秀的,特别是对离线海量数据的处理。
hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。

下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。

对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:
DSC0000.jpg
DSC0001.jpg
map里面的核心代码如下:
  /**
* 统计词频的map端
* 代码
*
* **/
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String [] data=value.toString().split(";");//按每行的分号拆分短语
for(String s:data){
if(s.trim().length()>0){//忽略空字符
word.set(s);
context.write(word, one);
}
}
}

reduce端的核心代码如下:
/**
* reduce端的
* 代码
* **/
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();//累加词频
}
result.set(sum);
context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔
}
}
main函数里面的代码如下:
  public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
运行结果,如下所示:
a good student::1
good student::3
patient::2
patient a::1


下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:
/**
* 排序作业
* map的实现
*
* **/
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String s[]=value.toString().split("::");//按两个冒号拆分每行数据
word.set(s[0]);//
one.set(Integer.parseInt(s[1].trim()));//
context.write(one, word);//注意,此部分,需要反转K,V顺序
}
reduce端代码如下:
/***
*
* 排序作业的
* reduce代码
* **/
@Override
protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)
throws IOException, InterruptedException {
for(Text t:arg1){
result.set(t.toString());
arg2.write(result, arg0);
}
}

下面,我们再来看下排序组件的代码:

/***
* 按词频降序排序
* 的类
*
* **/
public static class DescSort extends  WritableComparator{
public DescSort() {
super(IntWritable.class,true);//注册排序组件
}
@Override
public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
int arg4, int arg5) {
return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序
}
@Override
public int compare(Object a, Object b) {
return   -super.compare(a, b);//注意使用负号来完成降序
}
}

main方法里面的实现代码如下所示:

public static void main(String[] args) throws Exception{

Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job=new Job(conf, "sort");
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(SortIntValueMapper.class);
job.setReducerClass(SortIntValueReducer.class);
job.setSortComparatorClass(DescSort.class);//加入排序组件
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

输出结果,如下所示:
good student3
patient2
a good student1
patient a1


至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。
最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313006-1-1.html 上篇帖子: 分布式系统概述(Hadoop与HBase的前生今世) 下篇帖子: Cloudera Impala: Real-Time Queries in Apache Hadoop, For Real
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表