设为首页 收藏本站
查看: 1262|回复: 0

[经验分享] 分布式计算开源框架Hadoop入门实践(三)

[复制链接]
发表于 2018-10-30 10:52:01 | 显示全部楼层 |阅读模式
Hadoop基本流程


  一个图片太大了,只好分割成为两部分。根据流程图来说一下具体一个任务执行的情况。

  •   在分布式环境中客户端创建任务并提交。
  •   InputFormat做Map前的预处理,主要负责以下工作:

    •   验证输入的格式是否符合JobConfig的输入定义,这个在实现Map和构建Conf的时候就会知道,不定义可以是Writable的任意子类。
    •   将input的文件切分为逻辑上的输入InputSplit,其实这就是在上面提到的在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个block。
    •   通过RecordReader来再次处理inputsplit为一组records,输出给Map。(inputsplit只是逻辑切分的第一步,但是如何根据文件中的信息来切分还需要RecordReader来实现,例如最简单的默认方式就是回车换行的切分)

  •   RecordReader处理后的结果作为Map的输入,Map执行定义的Map逻辑,输出处理后的key和value对应到临时中间文件。
  •   Combiner可选择配置,主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。
  •   Partitioner可选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。(后面的代码实例中有介绍使用场景)
  •   Reduce执行具体的业务逻辑,并且将处理结果输出给OutputFormat。
  •   OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如Config中配置,最后输出Reduce汇总后的结果。
业务场景和代码范例
  业务场景描述:可设定输入和输出路径(操作系统的路径非HDFS路径),根据访问日志分析某一个应用访问某一个API的总次数和总流量,统计后分别输出到两个文件中。这里仅仅为了测试,没有去细分很多类,将所有的类都归并于一个类便于说明问题。

  测试代码类图
  LogAnalysiser就是主类,主要负责创建、提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体地看看几个类和方法的代码片断:
  LogAnalysiser::MapClass
    public static>        implements Mapper  {
  public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
  throws IOException
  {
  String line = value.toString();//没有配置RecordReader,所以默认采用line的实现,key就是行号,value就是行内容
  if (line == null || line.equals(""))
  return;
  String[] words = line.split(",");
  if (words == null || words.length < 8)
  return;
  String appid = words[1];
  String apiName = words[2];
  LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
  Text record = new Text();
  record.set(new StringBuffer("flow::").append(appid)
  .append("::").append(apiName).toString());
  reporter.progress();
  output.collect(record, recbytes);//输出流量的统计结果,通过flow::作为前缀来标示。
  record.clear();
  record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
  output.collect(record, new LongWritable(1));//输出次数的统计结果,通过count::作为前缀来标示
  }
  }
  LogAnalysiser:: PartitionerClass
    public static>    {  public int getPartition(Text key, LongWritable value, int numPartitions)
  {
  if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统计分配到不同的Reduce
  if (key.toString().startsWith("flow::"))
  return 0;
  else
  return 1;
  else
  return 0;
  }
  public void configure(JobConf job){}
  }
  LogAnalysiser:: CombinerClass
  参看ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在ReduceClass中蓝色的行表示在CombinerClass中不存在。
  LogAnalysiser:: ReduceClass
    public static>        implements Reducer  {
  public void reduce(Text key, Iterator values,
  OutputCollector output, Reporter reporter)throws IOException
  {
  Text newkey = new Text();
  newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
  LongWritable result = new LongWritable();
  long tmp = 0;
  int counter = 0;
  while(values.hasNext())//累加同一个key的统计结果
  {
  tmp = tmp + values.next().get();
  counter = counter +1;//担心处理太久,JobTracker长时间没有收到报告会认为TaskTracker已经失效,因此定时报告一下
  if (counter == 1000)
  {
  counter = 0;
  reporter.progress();
  }
  }
  result.set(tmp);
  output.collect(newkey, result);//输出最后的汇总结果
  }
  }
  LogAnalysiser
public static void main(String[] args)  {
  try
  {
  run(args);
  } catch (Exception e)
  {
  e.printStackTrace();
  }
  }
  public static void run(String[] args) throws Exception
  {
  if (args == null || args.length = 0)
  shortin = shortin.substring(shortin.lastIndexOf(File.separator));
  if (shortout.indexOf(File.separator) >= 0)
  shortout = shortout.substring(shortout.lastIndexOf(File.separator));
  SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
  shortout = new StringBuffer(shortout).append("-")
  .append(formater.format(new Date())).toString();
  if (!shortin.startsWith("/"))
  shortin = "/" + shortin;
  if (!shortout.startsWith("/"))
  shortout = "/" + shortout;
  shortin = "/user/root" + shortin;
  shortout = "/user/root" + shortout;
  File inputdir = new File(inputpath);
  File outputdir = new File(outputpath);
  if (!inputdir.exists() || !inputdir.isDirectory())
  {
  System.out.println("inputpath not exist or isn't dir!");
  return;
  }
  if (!outputdir.exists())
  {
  new File(outputpath).mkdirs();
  }
  JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//构建Config
  FileSystem fileSys = FileSystem.get(conf);
  fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//将本地文件系统的文件拷贝到HDFS中
  conf.setJobName("analysisjob");
  conf.setOutputKeyClass(Text.class);//输出的key类型,在OutputFormat会检查
  conf.setOutputValueClass(LongWritable.class); //输出的value类型,在OutputFormat会检查
  conf.setMapperClass(MapClass.class);
  conf.setCombinerClass(CombinerClass.class);
  conf.setReducerClass(ReduceClass.class);
  conf.setPartitionerClass(PartitionerClass.class);
  conf.set("mapred.reduce.tasks", "2");//强制需要有两个Reduce来分别处理流量和次数的统计
  FileInputFormat.setInputPaths(conf, shortin);//hdfs中的输入路径
  FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中输出路径
  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  JobClient.runJob(conf);
  Date end_time = new Date();
  System.out.println("Job ended: " + end_time);
  System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
  //删除输入和输出的临时文件
  fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
  fileSys.delete(new Path(shortin),true);
  fileSys.delete(new Path(shortout),true);
  }
  以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务Class为一个可标示的命令,让hadoop jar可以执行。
public>  public static void main(String argv[]){  ProgramDriver pgd = new ProgramDriver();
  try {
  pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
  pgd.driver(argv);
  }
  catch(Throwable e){
  e.printStackTrace();
  }
  }
  }
  将代码打成jar,并且设置jar的mainClass为ExampleDriver这个类。在分布式环境启动以后执行如下语句:
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out  在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括了Map和Reduce的进度。执行完毕会在/home/wenchu/test-out下看到输出的内容。有两个文件:part-00000和part-00001分别记录了统计后的结果。如果需要看执行的具体情况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的Map,Reduce的创建情况以及执行情况。在运行期也可以通过浏览器来查看Map,Reduce的情况:http://MasterIP:50030/jobtracker.jsp
Hadoop集群测试
  首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看集群的效果,以及一些参数配置的影响。
  文件复制数为1,blocksize 5M
Slave数处理记录数(万条)执行时间(秒)295382950337495244950178695216950114  Blocksize 5M
Slave数处理记录数(万条)执行时间(秒)2(文件复制数为1)9503372(文件复制数为3)9503396(文件复制数为1)9501146(文件复制数为3)950117  文件复制数为1
Slave数处理记录数(万条)执行时间(秒)6(blocksize 5M)95216(blocksize 77M)95264(blocksize 5M)9501784(blocksize 50M)950546(blocksize 5M)9501146(blocksize 50M)950446(blocksize 77M)95074  测试的数据结果很稳定,基本测几次同样条件下都是一样。通过测试结果可以看出以下几点:

  •   机器数对于性能还是有帮助的(等于没说^_^)。
  •   文件复制数的增加只对安全性有帮助,但是对于性能没有太多帮助。而且现在采取的是将操作系统文件拷贝到HDFS中,所以备份多了,准备的时间很长。
  •   blocksize对于性能影响很大,首先如果将block划分的太小,那么将会增加job的数量,同时也增加了协作的代价,降低了性能,但是配置的太大也会让job不能最大化并行处理。所以这个值的配置需要根据数据处理的量来考虑。
  •   最后就是除了这个表里面列出来的结果,应该去仔细看输出目录中的_logs/history中的xxx_analysisjob这个文件,里面记录了全部的执行过程以及读写情况。这个可以更加清楚地了解哪里可能会更加耗时。
随想
  “云计算”热的烫手,就和SAAS、Web2及SNS等一样,往往都是在搞概念,只有真正踏踏实实的大型互联网公司,才会投入人力物力去研究符合自己的分布式计算。其实当你的数据量没有那么大的时候,这种分布式计算也就仅仅只是一个玩具而已,只有在真正解决问题的过程中,它深层次的问题才会被挖掘出来。
  这三篇文章(分布式计算开源框架Hadoop介绍,Hadoop中的集群配置和使用技巧)仅仅是为了给对分布式计算有兴趣的朋友抛个砖,要想真的掘到金子,那么就踏踏实实的去用、去想、去分析。或者自己也会更进一步地去研究框架中的实现机制,在解决自己问题的同时,也能够贡献一些什么。
  前几日看到有人跪求成为架构师的方式,看了有些可悲,有些可笑,其实有多少架构师知道什么叫做架构?架构师的职责是什么?与其追求这么一个名号,还不如踏踏实实地做块石头沉到水底。要知道,积累和沉淀的过程就是一种成长。



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-628391-1-1.html 上篇帖子: 分布式计算开源框架Hadoop入门实践(二) 下篇帖子: Hadoop学习基础之三:MapReduce
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表