设为首页 收藏本站
查看: 740|回复: 0

[经验分享] Hadoop小小笔记

[复制链接]

尚未签到

发表于 2015-7-13 11:31:25 | 显示全部楼层 |阅读模式
  >> 偶遇JobClient
  这两年在在整一个云计算的东西,但工作主要集中在Client端。
  对Hadoop早有耳闻,但一直没有机会,前几天看到了JobClient这个熟悉的字眼,所以就把Hadoop的源代码拖来,找个机会看看。倒不是想用Hadoop干什么事情,了解了解,免得“云深不知处”。
  虽然Hadoop是用Java开发的,但问题不大,基本上能看懂。Hadoop当然是博大精深,包含了conf/DFS/io/ipc/MapReduce几个部分,但我也只是挑了MapReduce的代码作为观摩对象:
  感兴趣的文件夹:
  ...\src\mapred\org\apache\hadoop\mapred
  ...\src\mapred\org\apache\hadoop\mapreduce
  感兴趣的类:
  JobTracker/TaskTracker/
  JobID/JobProfile/JobContext
  JobInProgress/TaskInProgress/MapTask/ReduceTask
  JobHistory/JobHistoryServer
  
  >> 关于MapReduce
  MapReduce模型隐藏了并行化,容错,位置优化和负载均衡的细节,使用起来比较简洁。
   1. MapReduce == Map -> Combine -> Reduce
  Map-Reduce框架的运作完全基于对,也就是说数据的输入是一批对,生成的结果也是一批对,只是有时候它们的类型不一样而已。
  由于Key和value的类需要支持被序列化操作,它们必须要实现Writable接口。此外,key的类还必须实现WritableComparable接口,以便可以让框架对数据集的执行排序操作。
  一个Map-Reduce任务的执行过程以及数据输入输出的类型如下所示:
  (input) -> map ->  -> combine ->  -> reduce -> (output)


  
   2. 例子: WordCount 1.0
  MapReduce Tutorial中有一个WordCount的例子,要求从读取两个文本文件并计算文本中每个单词的总数。
  
  源代码:



package org.myorg;  
import java.io.IOException;  
import java.util.*;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.conf.*;  
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapred.*;  
import org.apache.hadoop.util.*;  
public class WordCount {  
    // Mapper之Map方法
    public static class Map extends MapReduceBase implements Mapper {  
      private final static IntWritable one = new IntWritable(1);  
      private Text word = new Text();  
      public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {  
        String line = value.toString();  
        StringTokenizer tokenizer = new StringTokenizer(line);  
        while (tokenizer.hasMoreTokens()) {  
          word.set(tokenizer.nextToken());  
          output.collect(word, one);  
        }  
      }  
    }  
     // Reducer之Reduce方法
    public static class Reduce extends MapReduceBase implements Reducer {  
      public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {  
        int sum = 0;  
        while (values.hasNext()) {  
          sum += values.next().get();  
        }  
        output.collect(key, new IntWritable(sum));  
      }  
    }  
    public static void main(String[] args) throws Exception {  
      // Job Configuraion
      JobConf conf = new JobConf(WordCount.class);  
      conf.setJobName("wordcount");  
      conf.setOutputKeyClass(Text.class);  
      conf.setOutputValueClass(IntWritable.class);  
      // 设置Mapper/Combiner/Reducer
      conf.setMapperClass(Map.class);  
      conf.setCombinerClass(Reduce.class);  
      conf.setReducerClass(Reduce.class);  
      // 设置输入/输出的格式,此处均为Text
      conf.setInputFormat(TextInputFormat.class);  
      conf.setOutputFormat(TextOutputFormat.class);  
      FileInputFormat.setInputPaths(conf, new Path(args[0]));  
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
      // 运行Job
      JobClient.runJob(conf);  
    }  
}  
  
  Inputs(file01 & file02):
  -------------------------------------------------------------------
  ../wordcount/input/file01:     Hello World Bye World
../wordcount/input/file02:     Hello Hadoop
Goodbye Hadoop
  -------------------------------------------------------------------

  Output:
  ------------------
      Bye 1
    Goodbye 1
    Hadoop 2
    Hello 2
    World 2
  ------------------
  
  Workflow:
  Step1: Mapper
  Mapper通过map方法每次处理一行文本,然后利用StringTokenizer将其分离成Tokens,然后就将键值对< , 1>输出,它将作为Combine的输入。
  ----------------------------
  the first map emits:
<
Hello, 1>
< World, 1>
< Bye, 1>
< World,
1>
  The second map emits:
< Hello, 1>
< Hadoop, 1>
<
Goodbye, 1>
< Hadoop, 1>
  -----------------------------

  Step2: Combine

  在WordCount这个例子中,Combiner与Reducer是一样的,Combiner类负责将相同key的值合并起来。
  ----------------------------------
  The output of the first map:
< Bye, 1>
< Hello, 1>
<
World, 2>
  The output of the second map:
< Goodbye,
1>
< Hadoop, 2>
< Hello, 1>
  ----------------------------------
  Step3: Reduce
  Reducer类通过reduce方法,计算每个单词的总数,从而得到最终的输出。
  -----------------------------------
  Thus the output of the job is:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello,
2>
< World, 2>
  -----------------------------------
  
  >> MapReduce Architecture
  --------------------------------------------------------------------------------------------
DSC0000.png
  --------------------------------------------------------------------------------------------
  
  >> JobClient
  每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成jar文件存储在HDFS,并把路径提交到 JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask)将它们分发到各个 TaskTracker服务中去执行。
  Methods:

  JobClient.runJob()
  JobClient.submitJob
  JobClient.killJob()
  
  >> JobTracker
它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。 master负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它,slave则负责直接执行每 一个task。TaskTracker都需要运行在HDFS的DataNode上,而JobTracker则不需要,一般情况应该把JobTracker 部署在单独的机器上。
  JobTracker is a daemon per pool that administers all aspects of mapred activities.
JobTracker keeps all the current jobs by containing instances of JobInProgress.
Methods:

JobTracker.submitJob(): creates/adds a JobInProgress to jobs and jobsByArrival
JobTracker.pollForNewTask()
  
  >> JobInProgress/TaskInProgress

   JobInProgress represents a job as it is being tracked by JobTracker.

   TaskInProgress represents a set of tasks for a given unique input, where input is a split for map task or a partition for reduce task.
  
  >> MapTask/ReduceTask:
MapTask offers method run() that calls MapRunner.run(), which in turn calls the user-supplied Mapper.map().
ReduceTask offers run() that sorts input files using SequenceFile.Sorter.sort(), and then calls user-supplied Reducer.reduce().
  
  >> 其他
  Hadoop的Task Recovery机制还是比较有意思的,它可以重新尝试运行失败的Task,具体可以看看JobTracker.RecoveryManager。
  
  // I should borrow some concept of Hadoop to SolidMCP
//    RunningJob
//    Reporter
//    JobClient
//    JobHistory.HistoryCleaner
//    JobHistory.JobInfo
//    JobHistory.Listener
//    JobProfile
//    TaskReport
//    TaskTracker
//    TaskLog
//    JobQueueInfo
//    JobContext
//    JobEndNotifier
//    JobControl

  
  References:
  http://wiki.apache.org/hadoop/HadoopMapRedClasses
  http://sebug.net/paper/databases/nosql/Nosql.html
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86286-1-1.html 上篇帖子: Hadoop学习系列之简单的带词频统计的倒排索引实现 下篇帖子: NewSQL体系比Hadoop更具效率
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表