设为首页 收藏本站
查看: 1868|回复: 0

[经验分享] hadoop debug 以及ReduceTask的部分分析

[复制链接]

尚未签到

发表于 2016-12-9 10:17:14 | 显示全部楼层 |阅读模式
分析基于hadoop-0.19.2
MapTask和ReduceTask的入口是
org.apache.hadoop.mapred.Child.main(String[] args){ }
传入的args举例如下:
//args = [127.0.0.1, 57354, attempt_201107272049_0001_m_000003_0, 497563501]
//args = [127.0.0.1, 41819, attempt_201107280338_0001_r_000000_1, -2017927773]
    String host = args[0];  // ip
    int port = Integer.parseInt(args[1]); //端口号
    InetSocketAddress address = new InetSocketAddress(host, port); //用于子进程跟TaskTracker通信用,接口是TaskUmbilicalProtocol。使用如下:
        TaskUmbilicalProtocol umbilical =
      (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
          TaskUmbilicalProtocol.versionID,
          address,
          defaultConf);
      attempt_201107272049_0001_m_000003_0
      // 201107272049_0001用于确定JobId,JobId=jobId = job_201107272049_0001
      // m 说明是Map,r说明是Reduce。
      //  其他id 还不清楚。
     

org.apache.hadoop.mapred.Child.main 被org.apache.hadoop.mapred.TaskRunner调用、启动,具体见TaskRunner的run方法。
在TaskRunner.run方法里:
String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
        vargs.add(javaOptsSplit);
}
默认运行map和reduce的子进程是200m。
mapred.child.java.opts这个属性是只有运行map和reduce的子进程用,所以我们可以传入一下其他参数,例如:
在hadoop-site.xml里加入:
<property>
  <name>mapred.child.java.opts</name>
  <value>-Xmx200m -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y</value>
</property>
这样子进程JVM运行的时候就会在8000端口监听,可以通过Eclipse远程debug进去,跟踪MapTask和ReduceTask的运行。
还需要改下其他参数比较好:
<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>1</value>
  <description>The maximum number of map tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>1</value>
  <description>The maximum number of reduce tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>mapred.task.timeout</name>
  <value>600000</value>
  <description>The number of milliseconds before a task will be
  terminated if it neither reads an input, writes an output, nor
  updates its status string.
  </description>
</property>
因为我是在自己的虚拟机上启动的一个hadoop集群,所有的进程都运行在一个机器上,任何时候都只能有一个进程在8000端口监听。所以设置一下同时最后只有一个 map或者一个reduce调度运行,mapred.task.timeout默认是10分钟超时,如果运行map或者reduce的子进程没有向TaskTracker上报信息或者心跳,就会被kill,debug过程需要的时间比较长,可以一个线程一个线程debug.


运行ReduceTask:
org.apache.hadoop.mapred.Child.main(String[] args){
    Task task = null;
    JvmTask myTask = umbilical.getTask(jvmId); // 子进程通过TaskUmbilicalProtocol从父进程TaskTracker获得一个Task的信息。
    task = myTask.getTask();     
    task.run(job, umbilical);             // run the task
}
Task  task 这个Task运行map时是MapTask,在运行reduce时是ReduceTask.
ReduceTask.run(JobConf job, final TaskUmbilicalProtocol umbilical){

    // start thread that will handle communication with parent
    startCommunicationThread(umbilical);

    if (!isLocal) {
      reduceCopier = new ReduceCopier(umbilical, job);
      if (!reduceCopier.fetchOutputs()) {
      }
    }
   
    copyPhase.complete();                         // copy is already complete

    RawKeyValueIterator rIter = isLocal
      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
          !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
          new Path(getTaskID().toString()), job.getOutputKeyComparator(),
          reporter)
      : reduceCopier.createKVIterator(job, rfs, reporter);
   
    sortPhase.complete();                         // sort is complete
   
    // make output collector
    String finalName = getOutputName(getPartition());

    FileSystem fs = FileSystem.get(job);

    final RecordWriter out =
      job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
   
    OutputCollector collector = new OutputCollector() {
        public void collect(Object key, Object value)
          throws IOException {
          out.write(key, value);
          reduceOutputCounter.increment(1);
          // indicate that progress update needs to be sent
          reporter.progress();
        }
      };
      // collector 直接往HDFS写。

      // apply reduce function
      Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
      // 这里的Reducer就是实现的Reducer类,如用hive提交的job就是org.apache.hadoop.hive.ql.exec.ExecReducer
      
      while (values.more()) { //最主要的循环处理
        reduceInputKeyCounter.increment(1);
        reducer.reduce(values.getKey(), values, collector, reporter);//对数据进行reduce
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
        }
        values.nextKey();
        values.informReduceProgress();
      }

      done(umbilical); //reduce处理完了通过umbilical.done(getTaskID());通知TaskTracker。
}


new的线程:按时间顺序
(1)在Child.main  71行中创建线程Thread t = new Thread() 用于同步日志:
        //every so often wake up and syncLogs so that we can track
        //logs of the currently running task
(2)在ReduceTask.run方法开始运行时调用startCommunicationThread,见Task的startCommunicationThread(final TaskUmbilicalProtocol umbilical)方法,new了一个pingProgressThread线程,该线程检查ReduceTask是否有状态如进度有变化,如有就向TaskTracker上报,如没有向TaskTracker发送心跳,保活。
(3)copy阶段有多种多个线程:
   主要是在reduceCopier.fetchOutputs()中调用。
  (3.1)MapOutputCopier,copying threads默认5个
  (3.2)LocalFSMerger,on-disk-merge thread线程,1个,merge磁盘上的文件
  (3.3)InMemFSMergeThread,in memory merger thread,1个,merge内存的文件
  一些线程的角色和分工是:
  主线程从TaskTracker获得MapTask完成的事件信息,构造获得Map输出结果的URL放入队列mapLocations。TaskTracker是从JobTracker获得MapTask已完成的信息。
    主线程遍历mapLocations把这些任务移到队列scheduledCopies中。
    每个MapOutputCopier互斥的从队列scheduledCopies获取任务,如果没有就等待在队列上,如果获得一个通过http请求从TaskTracker获得那部分map output。
    LocalFSMerger把MapOutputCopier获取过来放到disk上的map output进行归并。
    InMemFSMergeThread合并内存上的map output。



Map:
(1)Read (reading map inputs)
(2)Map (map function processing)
(3)Collect (serializing to buff er and partitioning)
(4)Spill (sorting, combining, compressing, and writing map outputs to local disk)
(5)Merge (merging sorted spill files)
Reduce:
(1)Shuffle (transferring map outputs to reduce tasks, with decompression if needed)
(2)Merge (merging sorted map outputs)
(3)Reduce (reduce function processing)
(4)Write(writing reduce outputs to the distributed fi le-system)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311844-1-1.html 上篇帖子: Hadoop集群网络性能优化:Hadoop机架感知实现及配置 下篇帖子: Hadoop Streaming 之 awk实现Map/Reduce
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表