设为首页 收藏本站
查看: 683|回复: 0

[经验分享] hadoop之MapReduce自定义二次排序流程实例详解

[复制链接]

尚未签到

发表于 2018-10-31 09:15:09 | 显示全部楼层 |阅读模式
package com.mr;  
import java.io.IOException;
  
import java.util.Iterator;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.conf.Configured;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.IntWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.Mapper;
  
import org.apache.hadoop.mapreduce.Reducer;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  
import org.apache.hadoop.util.Tool;
  
import org.apache.hadoop.util.ToolRunner;
  
import org.slf4j.Logger;
  
import org.slf4j.LoggerFactory;
  
/**
  
* @author zengzhaozheng
  
*
  
* 用途说明:二次排序mapreduce
  
* 需求描述:
  
* ---------------输入-----------------
  
* sort1,1
  
* sort2,3
  
* sort2,77
  
* sort2,54
  
* sort1,2
  
* sort6,22
  
* sort6,221
  
* sort6,20
  
* ---------------目标输出---------------
  
* sort1 1,2
  
* sort2 3,54,77
  
* sort6 20,22,221
  
*/
  
public class SecondSortMR extends Configured  implements Tool {
  
    private static final Logger logger = LoggerFactory.getLogger(SecondSortMR.class);
  
    public static class SortMapper extends Mapper {
  
    //---------------------------------------------------------
  
        /**
  
         * 这里特殊要说明一下,为什么要将这些变量写在map函数外边。
  
         * 对于分布式的程序,我们一定要注意到内存的使用情况,对于mapreduce框架,
  
         * 每一行的原始记录的处理都要调用一次map函数,假设,此个map要处理1亿条输
  
         * 入记录,如果将这些变量都定义在map函数里边则会导致这4个变量的对象句柄编
  
         * 程非常多(极端情况下将产生4*1亿个句柄,当然java也是有自动的gc机制的,
  
         * 一定不会达到这么多,但是会浪费很多时间去GC),导致栈内存被浪费掉。我们将其写在map函数外边,
  
         * 顶多就只有4个对象句柄。
  
         */
  
        CombinationKey combinationKey = new CombinationKey();
  
        Text sortName = new Text();
  
        IntWritable score = new IntWritable();
  
        String[] inputString = null;
  
    //---------------------------------------------------------
  
        @Override
  
        protected void map(Text key, Text value, Context context)
  
                throws IOException, InterruptedException {
  
            logger.info("---------enter map function flag---------");
  
            //过滤非法记录
  
            if(key == null || value == null || key.toString().equals("")
  
                    || value.equals("")){
  
                return;
  
            }
  
            sortName.set(key.toString());
  
            score.set(Integer.parseInt(value.toString()));
  
            combinationKey.setFirstKey(sortName);
  
            combinationKey.setSecondKey(score);
  
            //map输出
  
            context.write(combinationKey, score);
  
            logger.info("---------out map function flag---------");
  
        }
  
    }
  
    public static class SortReducer extends
  
    Reducer {
  
        StringBuffer sb = new StringBuffer();
  
        Text sore = new Text();
  
        /**
  
         * 这里要注意一下reduce的调用时机和次数:reduce每处理一个分组的时候会调用一
  
         * 次reduce函数。也许有人会疑问,分组是什么?看个例子就明白了:
  
         * eg:
  
         * {{sort1,{1,2}},{sort2,{3,54,77}},{sort6,{20,22,221}}}
  
         * 这个数据结果是分组过后的数据结构,那么一个分组分别为{sort1,{1,2}}、
  
         * {sort2,{3,54,77}}、{sort6,{20,22,221}}
  
         */
  
        @Override
  
        protected void reduce(CombinationKey key,
  
                Iterable value, Context context)
  
                throws IOException, InterruptedException {
  
            sb.delete(0, sb.length());//先清除上一个组的数据
  
            Iterator it = value.iterator();
  

  
            while(it.hasNext()){
  
                sb.append(it.next()+",");
  
            }
  
            //去除最后一个逗号
  
            if(sb.length()>0){
  
                sb.deleteCharAt(sb.length()-1);
  
            }
  
            sore.set(sb.toString());
  
            context.write(key.getFirstKey(),sore);
  
            logger.info("---------enter reduce function flag---------");
  
            logger.info("reduce Input data:{["+key.getFirstKey()+","+
  
            key.getSecondKey()+"],["+sore+"]}");
  
            logger.info("---------out reduce function flag---------");
  
        }
  
    }
  
    @Override
  
    public int run(String[] args) throws Exception {
  
        Configuration conf=getConf(); //获得配置文件对象
  
        Job job=new Job(conf,"SoreSort");
  
        job.setJarByClass(SecondSortMR.class);
  

  
        FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
  
        FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
  

  
        job.setMapperClass(SortMapper.class);
  
        job.setReducerClass(SortReducer.class);
  

  
        job.setPartitionerClass(DefinedPartition.class); //设置自定义分区策略
  

  
        job.setGroupingComparatorClass(DefinedGroupSort.class); //设置自定义分组策略
  
        job.setSortComparatorClass(DefinedComparator.class); //设置自定义二次排序策略
  

  
        job.setInputFormatClass(KeyValueTextInputFormat.class); //设置文件输入格式
  
        job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
  

  
        //设置map的输出key和value类型
  
        job.setMapOutputKeyClass(CombinationKey.class);
  
        job.setMapOutputValueClass(IntWritable.class);
  

  
        //设置reduce的输出key和value类型
  
        job.setOutputKeyClass(Text.class);
  
        job.setOutputValueClass(Text.class);
  
        job.waitForCompletion(true);
  
        return job.isSuccessful()?0:1;
  
    }
  

  
    public static void main(String[] args) {
  
        try {
  
            int returnCode =  ToolRunner.run(new SecondSortMR(),args);
  
            System.exit(returnCode);
  
        } catch (Exception e) {
  
            // TODO Auto-generated catch block
  
            e.printStackTrace();
  
        }
  

  
    }
  
}



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-628763-1-1.html 上篇帖子: 生产环境下Hadoop大集群安装与配置+DNS+NFS-DavideyLee 下篇帖子: hadoop伪分布式的安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表