设为首页 收藏本站
查看: 961|回复: 0

[经验分享] hadoop中MapReduce多种join实现实例分析

[复制链接]

尚未签到

发表于 2018-10-31 08:51:38 | 显示全部楼层 |阅读模式
package com.mr.reduceSizeJoin;  
import java.io.IOException;
  
import java.util.ArrayList;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.conf.Configured;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.Mapper;
  
import org.apache.hadoop.mapreduce.Reducer;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  
import org.apache.hadoop.util.Tool;
  
import org.apache.hadoop.util.ToolRunner;
  
import org.slf4j.Logger;
  
import org.slf4j.LoggerFactory;
  
/**
  
* @author zengzhaozheng
  
* 用途说明:
  
* reudce side join中的left outer join
  
* 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
  
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
  
* tb_dim_city.dat文件内容,分隔符为"|":
  
* id     name  orderid  city_code  is_show
  
* 0       其他        9999     9999         0
  
* 1       长春        1        901          1
  
* 2       吉林        2        902          1
  
* 3       四平        3        903          1
  
* 4       松原        4        904          1
  
* 5       通化        5        905          1
  
* 6       辽源        6        906          1
  
* 7       白城        7        907          1
  
* 8       白山        8        908          1
  
* 9       延吉        9        909          1
  
* -------------------------风骚的分割线-------------------------------
  
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
  
* tb_user_profiles.dat文件内容,分隔符为"|":
  
* userID   network     flow    cityID
  
* 1           2G       123      1
  
* 2           3G       333      2
  
* 3           3G       555      1
  
* 4           2G       777      3
  
* 5           3G       666      4
  
*
  
* -------------------------风骚的分割线-------------------------------
  
*  结果:
  
*  1   长春  1   901 1   1   2G  123
  
*  1   长春  1   901 1   3   3G  555
  
*  2   吉林  2   902 1   2   3G  333
  
*  3   四平  3   903 1   4   2G  777
  
*  4   松原  4   904 1   5   3G  666
  
*/
  
public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
  
    private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
  
    public static class LeftOutJoinMapper extends Mapper {
  
        private CombineValues combineValues = new CombineValues();
  
        private Text flag = new Text();
  
        private Text joinKey = new Text();
  
        private Text secondPart = new Text();
  
        @Override
  
        protected void map(Object key, Text value, Context context)
  
                throws IOException, InterruptedException {
  
            //获得文件输入路径
  
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
  
            //数据来自tb_dim_city.dat文件,标志即为"0"
  
            if(pathName.endsWith("tb_dim_city.dat")){
  
                String[] valueItems = value.toString().split("\\|");
  
                //过滤格式错误的记录
  
                if(valueItems.length != 5){
  
                    return;
  
                }
  
                flag.set("0");
  
                joinKey.set(valueItems[0]);
  
                secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
  
                combineValues.setFlag(flag);
  
                combineValues.setJoinKey(joinKey);
  
                combineValues.setSecondPart(secondPart);
  
                context.write(combineValues.getJoinKey(), combineValues);
  

  
            }//数据来自于tb_user_profiles.dat,标志即为"1"
  
            else if(pathName.endsWith("tb_user_profiles.dat")){
  
                String[] valueItems = value.toString().split("\\|");
  
                //过滤格式错误的记录
  
                if(valueItems.length != 4){
  
                    return;
  
                }
  
                flag.set("1");
  
                joinKey.set(valueItems[3]);
  
                secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
  
                combineValues.setFlag(flag);
  
                combineValues.setJoinKey(joinKey);
  
                combineValues.setSecondPart(secondPart);
  
                context.write(combineValues.getJoinKey(), combineValues);
  
            }
  
        }
  
    }
  
    public static class LeftOutJoinReducer extends Reducer {
  
        //存储一个分组中的左表信息
  
        private ArrayList leftTable = new ArrayList();
  
        //存储一个分组中的右表信息
  
        private ArrayList rightTable = new ArrayList();
  
        private Text secondPar = null;
  
        private Text output = new Text();
  
        /**
  
         * 一个分组调用一次reduce函数
  
         */
  
        @Override
  
        protected void reduce(Text key, Iterable value, Context context)
  
                throws IOException, InterruptedException {
  
            leftTable.clear();
  
            rightTable.clear();
  
            /**
  
             * 将分组中的元素按照文件分别进行存放
  
             * 这种方法要注意的问题:
  
             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
  
             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
  
             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
  
             */
  
            for(CombineValues cv : value){
  
                secondPar = new Text(cv.getSecondPart().toString());
  
                //左表tb_dim_city
  
                if("0".equals(cv.getFlag().toString().trim())){
  
                    leftTable.add(secondPar);
  
                }
  
                //右表tb_user_profiles
  
                else if("1".equals(cv.getFlag().toString().trim())){
  
                    rightTable.add(secondPar);
  
                }
  
            }
  
            logger.info("tb_dim_city:"+leftTable.toString());
  
            logger.info("tb_user_profiles:"+rightTable.toString());
  
            for(Text leftPart : leftTable){
  
                for(Text rightPart : rightTable){
  
                    output.set(leftPart+ "\t" + rightPart);
  
                    context.write(key, output);
  
                }
  
            }
  
        }
  
    }
  
    @Override
  
    public int run(String[] args) throws Exception {
  
          Configuration conf=getConf(); //获得配置文件对象
  
            Job job=new Job(conf,"LeftOutJoinMR");
  
            job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
  

  
            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
  
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
  

  
            job.setMapperClass(LeftOutJoinMapper.class);
  
            job.setReducerClass(LeftOutJoinReducer.class);
  

  
            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
  
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
  

  
            //设置map的输出key和value类型
  
            job.setMapOutputKeyClass(Text.class);
  
            job.setMapOutputValueClass(CombineValues.class);
  

  
            //设置reduce的输出key和value类型
  
            job.setOutputKeyClass(Text.class);
  
            job.setOutputValueClass(Text.class);
  
            job.waitForCompletion(true);
  
            return job.isSuccessful()?0:1;
  
    }
  
    public static void main(String[] args) throws IOException,
  
            ClassNotFoundException, InterruptedException {
  
        try {
  
            int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
  
            System.exit(returnCode);
  
        } catch (Exception e) {
  
            // TODO Auto-generated catch block
  
            logger.error(e.getMessage());
  
        }
  
    }
  
}



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-628732-1-1.html 上篇帖子: hadoop2.0 federation与HA的配置 下篇帖子: Hadoop 2.0 NameNode HA和Federation实践
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表