设为首页 收藏本站
查看: 1190|回复: 0

[经验分享] Hadoop入门第二篇

[复制链接]

尚未签到

发表于 2017-12-17 06:42:29 | 显示全部楼层 |阅读模式

  mapreduce是一种计算模型,是google的一篇论文向全世界介绍了MapReduce。MapReduce其实可以可以用多种语言编写Map或Reduce程序,因为hadoop是java写的,所以通常情况下我们都是选择java编程语言。其实mr的编写格式或者说语法要求很简单,其实复杂的是我们要学会利用这个模型,将问题分解计算。

MapReduce计算模型

  MapReduce Job
  每个mr任务都被初始化成一个job,后续我们在编写自己的第一个mr任务的时候也会感受到。每个job分为Map阶段和reduce阶段,其实绝大部分情况我们还有combiner,这个combiner具体是什么在后续关于wordcount第一个mr任务的时候再介绍,这样比较好理解。Map函数接收一个<k1,v1>形式输入,输出<key,value>然后hadoop会将所有相同的中间key的value值进行合并,格式类似<key,list(values)>作为reduce的输入,大致过程如下:
  INPUT   =====>   K1,V1  ======>(MAP)    K2,V2    ======>(REDUCE)   K3,V3  =====>OUTPUT

  

  直接动手第一个程序 WordCount
  wordcount任务是计算文件中每个单词出现的次数,类似:select count(1) as total,word from words group by word.
  

package com.wangke.hadoop.example;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.Mapper;  

import org.apache.hadoop.mapreduce.Reducer;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

  

import java.io.IOException;  

import java.util.StringTokenizer;  

  

public>
public static>private final static IntWritable one = new IntWritable(1);private Text word = new Text();  

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {  StringTokenizer itr
= new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {  word.set(itr.nextToken());
  context.write(word, one);
  }
  }
  }
  

public static>private IntWritable result = new IntWritable();  

public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0 ;for (IntWritable value : values) {  sum
+= value.get();  }
  result.set(sum);
  context.write(key,result);
  }
  }
  

public static void main(String[] args) throws IOException,>Configuration conf = new Configuration();  Job job
= Job.getInstance(conf,"wordcount");  job.setJarByClass(WordCount.
class);  job.setMapperClass(TokenizerMapper.
class);  job.setCombinerClass(IntSumReducer.
class);  job.setReducerClass(IntSumReducer.
class);  job.setOutputKeyClass(Text.
class);  job.setOutputValueClass(IntWritable.
class);  FileInputFormat.addInputPath(job,
new Path(args[0]));  FileOutputFormat.setOutputPath(job,
new Path(args[1]));  System.exit(job.waitForCompletion(
true)?0:1);  }
  
}
  

  接下来的步骤,maven打成jar包,拷贝到你部署hadoop的机器,执行以下相关命令(我用的是2.8.1)


  •   

    $ bin/hdfs namenode -format  

    sbin/start-dfs.sh  


  •   

    $ bin/hdfs dfs -mkdir input  $ bin/hdfs dfs -put etc/hadoop/*.xml input
      


  • bin/hadoop jar study-1.0-SNAPSHOT.jar com.wangke.hadoop.example.WordCount input output
  • bin/hdfs dfs -cat output/*   (查看结果)
  关于此小程序的一些介绍


  •   Mapper和Reduce都是直接继承抽象类 Mapper,,Reducer,这两个之所以是抽象类,因为我们自己写的Mapper/Reducer真正实现过程是抽象父类实现的
  setup函数:called once at the start of the task
  cleanup函数:called once at the end of the task
  run函数:run函数的过程其实map或者reduce的执行过程,可以看源代码就知道,这能够帮助我们更好的去理解map或者reduce函数是怎么执行和得到结果的
  

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {this.setup(context);  

try {while(context.nextKeyValue()) {this.map(context.getCurrentKey(), context.getCurrentValue(), context);  }
  }
finally {this.cleanup(context);  }
  



  • InputSplite  InputFormat  OutputFormat
  InputSplit是Hadoop中把输入数据传送给每个单独的Map,InputSplite存储的并非数据本身,而是一个分片长度和以及记录数据位置的数组。而生成InputSplit的方式时可以通过Inputformat()来设置,所以InputFormat()方法是用来生成可以提供Map处理的<key,value>的。常见的InputFormat子类:
  BileyBorweinPlouffe.BbpInputFormat  DBInputFormat  ComposableInputFormat  FileInputFormat(CombineFileInputFormat,KeyValueTextInputFormat,TextInputFormat)等等
  OutputFormat是跟InputFormat相对应的,也就是mr结果的输出格式,我们示例中就是FileOutputFormat.


  • Map ,Reduce数据流和控制流以及示例代码中的  job.setCombinerClass(IntSumReducer.class)中的Combiner是什么
DSC0000.png

  这个是WordCount数据流程图,FileInputStream被处理形成两个InputSplit,然后输入到两个Map中(一般计算和存储都是在同一机器上,避免数据传输,也就是移动计算,避免移动存储),Map输出的结果是直接写在磁盘上的,而不是HDFS。而Reduce这时候会读取Map的输出数据,将结果写到HDFS。在Reduce过程中时无法避免数据传输的,这时候你应该会想到为了避免过多的数据传输,我们会将每个Map的结果先局部的做一个WordCount,这个过程就是我们的Combiner了,所以Combiner其实也就是一个reducer过程。

MapReduce的优化与性能调优
  MapReduce计算模型的优化主要集中在两个方面:计算性能方面的优化,I/O操作方面的优化。

优化的几个方面:


  • 任务调度:任务分配给空闲的机器;尽量将Map任务分配给InputSplit所在的机器,移动计算来减少网络I/O
  • 数据预处理与InputSplit的大小:Hadoop擅长处理少量的大数据而不是处理大量的小数据。,在MaxCompute使用时,我们也会发现在执行task前 会有一步合并小文件的步骤。
  • Map和Reduce任务的数量:调整相关参数,设置map和reduce的数量
  • 上节中提到的Combine函数
  • 压缩:对Map和最终的输出结果进行压缩,其中压缩算法是可以配置的
  • 自定义comparator:在hadoop中可以自定义数据类型
  • 过滤数据以及数据倾斜:通过数据过滤可以降低数据规模。在MaxCompute中我们知道用mapjoin来解决大表和小表关联的性能问题,在hadoop中我们用Bloom Filter来解决类似问题,关于BloomFilter我们在下一小节具体介绍
Bloom Filter:
  Bloom Filter是由Howard Bloom提出的二进制向量数据结构。在保存所有集合元素特征的同时,他能在保证高效空间效率和一定出错率的前提下迅速检测一个元素是不是集合中的成员。怎样去利用Bloom Filter去解决大表和小表的内连接呢,也就是怎么实现Maxcompute的mapjoin。
  先创建BloomFilter对象,将小表中所有连接列上的值都保存在Bloom Filter中(数据量小,直接加载在内存中),然后开始通过mr执行内连接。在map阶段,读小表的数据直接以连接列值为key,以数据未value的<key,value>;读大表数据时,在输出前先判断当前元祖的连接列值是否在BF中,如果不存在就不需要输出,如果存在 就采用与小表同样的方式输出,对于不在集合内的元素一定是判断正确,这样就可以过滤掉不需要的数据。最后在reduce阶段,针对每个连接列值连接两个表的元组并输出结果。
  BF实现原理:他有两个重要接口add() ,membershipTest(),add负责保存集合元素的特征到位数组,membershipTest判断某个值是否是集合中的元素。BF利用k个相互独立的Hash函数将集合中的每个元素迎神到(1,2,..,M)个范围内,这时候就相当于是一个k维特征向量,如果判断一个元素是否存在(也就是该元素与小表中的某个元素相等),其实看他们的向量是否一样,因为hash的特性,A  != B,但是对应的向量是一样的,这种情况很少见,所以会有少量错误率,但是性能大大提高。我们如果增加k和M时可以降低错误率的

Map ReduceJob中全局共享数据


  • 读写HDFS文件:Map task和Reduce task甚至是不同的Job都可以通过读写HDFS中预定好的同一个文件共享全局变量
  • 配置Job属性:在MapReduce执行过程中,task可以读取Job的属性,基于这个特性,大家可以在任务启动之初利用Cofiguration类中的set(name,value)将一些曲剧变量封装进去,然后通过get方法读取。
  • 使用distributedCache:这个是MapReduce为应用提供缓存文件的只读工具。在使用时,用户可以在作业配置时使用本地或者HDFS文件的url来将其设置成共享缓存文件。task启动之前,MR框架会将缓存文件复制到执行任务节点的本地。优点是每个Job共享文件只会启动之后复制一次,且适用于大量的共享数据,缺点就是只读。
链接MapReduce Job
  在日常数据处理过程中,并不是一个mr就能解决问题的,需要多个mr作业配合完成,其实实际中我们更多的是利用一个mapper多个reducer。


  • 线性MapReduce Job流:就是多个Job按照一定顺序,第一个job的输出作为后面一个Job的输入。缺点:顺序结构,且流程不好控制
  • 复杂MapReduce Job流:例如 Job3需要Job1  和Job2的结果,其实MR框架提供了ControlledJob和JobControl类控制,具体不介绍了。
本地测试
  对于写好一个mrJob ,不能每次都通过打包放到集群测试,这样效率太低,Score_Process类继承与Configure的实现接口Toll,通过run方法可以实现对程序进行测试。
  

import com.wangke.hadoop.example.WordCount;  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

import org.apache.hadoop.util.Tool;  

import org.apache.hadoop.util.ToolRunner;  

  

public>public int run(String[] args) throws Exception {  Configuration conf
= new Configuration();  Job job
= Job.getInstance(conf,"wordcount");  job.setJarByClass(WordCount.
class);  job.setMapperClass(WordCount.TokenizerMapper.
class);  job.setCombinerClass(WordCount.IntSumReducer.
class);  job.setReducerClass(WordCount.IntSumReducer.
class);  job.setOutputKeyClass(Text.
class);  job.setOutputValueClass(IntWritable.
class);  FileInputFormat.addInputPath(job,
new Path(args[0]));  FileOutputFormat.setOutputPath(job,
new Path(args[1]));boolean success = job.waitForCompletion(true);return success ? 0:1;  }
  

public static void main(String[] args) throws  Exception {int ret = ToolRunner.run(new ScoreTest(),args);  System.exit(ret);
  }
  

public void setConf(Configuration configuration) {  

  }
  

public Configuration getConf() {return null;  }
  
}
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-424893-1-1.html 上篇帖子: Hadoop单机Hadoop测试环境搭建 下篇帖子: Centos7中hadoop配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表