设为首页 收藏本站
查看: 695|回复: 0

[经验分享] Hadoop学习总结之四:Map-Reduce的过程解析

[复制链接]

尚未签到

发表于 2016-12-13 06:35:30 | 显示全部楼层 |阅读模式
一、客户端
  Map-Reduce的过程首先是由客户端提交一个任务开始的。
  提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:
  public static RunningJob runJob(JobConf job) throws IOException {
  //首先生成一个JobClient对象
  JobClient jc = new JobClient(job);
  ……
  //调用submitJob来提交一个任务
  running = jc.submitJob(job);
  JobID jobId = running.getID();
  ……
  while (true) {
  //while循环中不断得到此任务的状态,并打印到客户端console中
  }
  return running;
  }
  其中JobClient的submitJob函数实现如下:
  public RunningJob submitJob(JobConf job) throws FileNotFoundException,
  InvalidJobConfException, IOException {
  //从JobTracker得到当前任务的id
  JobID jobId = jobSubmitClient.getNewJobId();
  //准备将任务运行所需要的要素写入HDFS:
  //任务运行程序所在的jar封装成job.jar
  //任务所要处理的input split信息写入job.split
  //任务运行的配置项汇总写入job.xml
  Path submitJobDir = new Path(getSystemDir(), jobId.toString());
  Path submitJarFile = new Path(submitJobDir, "job.jar");
  Path submitSplitFile = new Path(submitJobDir, "job.split");
  //此处将-libjars命令行指定的jar上传至HDFS
  configureCommandLineOptions(job, submitJobDir, submitJarFile);
  Path submitJobFile = new Path(submitJobDir, "job.xml");
  ……
  //通过input format的格式获得相应的input split,默认类型为FileSplit
  InputSplit[] splits =
  job.getInputFormat().getSplits(job, job.getNumMapTasks());
  // 生成一个写入流,将input split得信息写入job.split文件
  FSDataOutputStream out = FileSystem.create(fs,
  submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
  try {
  //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。
  //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split在文件中的起始位置),split的location信息(即在那个DataNode上)。
  writeSplitsFile(splits, out);
  } finally {
  out.close();
  }
  job.set("mapred.job.split.file", submitSplitFile.toString());
  //根据split的个数设定map task的个数
  job.setNumMapTasks(splits.length);
  // 写入job的配置信息入job.xml文件      
  out = FileSystem.create(fs, submitJobFile,
  new FsPermission(JOB_FILE_PERMISSION));
  try {
  job.writeXml(out);
  } finally {
  out.close();
  }
  //真正的调用JobTracker来提交任务
  JobStatus status = jobSubmitClient.submitJob(jobId);
  ……
  }
二、JobTracker
  JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

  • 调用静态函数startTracker(new JobConf())创建一个JobTracker对象
  • 调用JobTracker.offerService()函数提供服务
  在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。
  在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

  • JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
  • EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化
  EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。
  在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
  synchronized (jobs) {
  synchronized (taskScheduler) {
  jobs.put(job.getProfile().getJobID(), job);
  //对JobTracker的每一个listener都调用jobAdded函数
  for (JobInProgressListener listener : jobInProgressListeners) {
  listener.jobAdded(job);
  }
  }
  }
  EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:
  public synchronized void initTasks() throws IOException {
  ……
  //从HDFS中读取job.split文件从而生成input splits
  String jobFile = profile.getJobFile();
  Path sysDir = new Path(this.jobtracker.getSystemDir());
  FileSystem fs = sysDir.getFileSystem(conf);
  DataInputStream splitFile =
  fs.open(new Path(conf.get("mapred.job.split.file")));
  JobClient.RawSplit[] splits;
  try {
  splits = JobClient.readSplitFile(splitFile);
  } finally {
  splitFile.close();
  }
  //map task的个数就是input split的个数
  numMapTasks = splits.length;
  //为每个map tasks生成一个TaskInProgress来处理一个input split
  maps = new TaskInProgress[numMapTasks];
  for(int i=0; i < numMapTasks; ++i) {
  inputLength += splits.getDataLength();
  maps = new TaskInProgress(jobId, jobFile,
  splits,
  jobtracker, conf, this, i);
  }
  //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。
  if (numMapTasks > 0) {
    nonRunningMapCache = createCache(splits, maxLevel);
  }
  //创建reduce task
  this.reduces = new TaskInProgress[numReduceTasks];
  for (int i = 0; i < numReduceTasks; i++) {
  reduces = new TaskInProgress(jobId, jobFile,
  numMapTasks, i,
  jobtracker, conf, this);
  //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。
  nonRunningReduces.add(reduces);
  }
  //创建两个cleanup task,一个用来清理map,一个用来清理reduce.
  cleanup = new TaskInProgress[2];
  cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
  jobtracker, conf, this, numMapTasks);
  cleanup[0].setJobCleanupTask();
  cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  numReduceTasks, jobtracker, conf, this);
  cleanup[1].setJobCleanupTask();
  //创建两个初始化 task,一个初始化map,一个初始化reduce.
  setup = new TaskInProgress[2];
  setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
  jobtracker, conf, this, numMapTasks + 1 );
  setup[0].setJobSetupTask();
  setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  numReduceTasks + 1, jobtracker, conf, this);
  setup[1].setJobSetupTask();
  tasksInited.set(true);//初始化完毕
  ……
  }
三、TaskTracker
  TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:
  State offerService() throws Exception {
  long lastHeartbeat = 0;
  //TaskTracker进行是一直存在的
  while (running && !shuttingDown) {
  ……
  long now = System.currentTimeMillis();
  //每隔一段时间就向JobTracker发送heartbeat
  long waitTime = heartbeatInterval - (now - lastHeartbeat);
  if (waitTime > 0) {
  synchronized(finishedCount) {
  if (finishedCount[0] == 0) {
  finishedCount.wait(waitTime);
  }
  finishedCount[0] = 0;
  }
  }
  ……
  //发送Heartbeat到JobTracker,得到response
  HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
  ……
  //从Response中得到此TaskTracker需要做的事情
  TaskTrackerAction[] actions = heartbeatResponse.getActions();
  ……
  if (actions != null){
  for(TaskTrackerAction action: actions) {
  if (action instanceof LaunchTaskAction) {
  //如果是运行一个新的Task,则将Action添加到任务队列中
  addToTaskQueue((LaunchTaskAction)action);
  } else if (action instanceof CommitTaskAction) {
  CommitTaskAction commitAction = (CommitTaskAction)action;
  if (!commitResponses.contains(commitAction.getTaskID())) {
  commitResponses.add(commitAction.getTaskID());
  }
  } else {
  tasksToCleanup.put(action);
  }
  }
  }
  }
  return State.NORMAL;
  }
  其中transmitHeartBeat主要逻辑如下:
  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
  //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息
  boolean sendCounters;
  if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
  sendCounters = true;
  previousUpdate = now;
  }
  else {
  sendCounters = false;
  }
  ……
  //报告给JobTracker,此TaskTracker的当前状态
  if (status == null) {
  synchronized (this) {
  status = new TaskTrackerStatus(taskTrackerName, localHostname,
  httpPort,
  cloneAndResetRunningTaskStatuses(
  sendCounters),
  failures,
  maxCurrentMapTasks,
  maxCurrentReduceTasks);
  }
  }
  ……
  //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:
  //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数
  //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数
  boolean askForNewTask;
  long localMinSpaceStart;
  synchronized (this) {
  askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
  status.countReduceTasks() < maxCurrentReduceTasks) &&
  acceptNewTasks;
  localMinSpaceStart = minSpaceStart;
  }
  ……
  //向JobTracker发送heartbeat,这是一个RPC调用
  HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
  justStarted, askForNewTask,
  heartbeatResponseId);
  ……
  return heartbeatResponse;
  }
四、JobTracker
  当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
  public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
  boolean initialContact, boolean acceptNewTasks, short responseId)
  throws IOException {
  ……
  String trackerName = status.getTrackerName();
  ……
  short newResponseId = (short)(responseId + 1);
  ……
  HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
  List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
  //如果TaskTracker向JobTracker请求一个task运行
  if (acceptNewTasks) {
  TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
  if (taskTrackerStatus == null) {
  LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
  } else {
  //setup和cleanup的task优先级最高
  List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
  if (tasks == null ) {
  //任务调度器分配任务
  tasks = taskScheduler.assignTasks(taskTrackerStatus);
  }
  if (tasks != null) {
  for (Task task : tasks) {
  //将任务放入actions列表,返回给TaskTracker
  expireLaunchingTasks.addNewTask(task.getTaskID());
  actions.add(new LaunchTaskAction(task));
  }
  }
  }
  }
  ……
  int nextInterval = getNextHeartbeatInterval();
  response.setHeartbeatInterval(nextInterval);
  response.setActions(
  actions.toArray(new TaskTrackerAction[actions.size()]));
  ……
  return response;
  }
  默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
  throws IOException {
  ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
  int numTaskTrackers = clusterStatus.getTaskTrackers();
  Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
  int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
  int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
  int numMaps = taskTracker.countMapTasks();
  int numReduces = taskTracker.countReduceTasks();
  //计算剩余的map和reduce的工作量:remaining
  int remainingReduceLoad = 0;
  int remainingMapLoad = 0;
  synchronized (jobQueue) {
  for (JobInProgress job : jobQueue) {
  if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  int totalMapTasks = job.desiredMaps();
  int totalReduceTasks = job.desiredReduces();
  remainingMapLoad += (totalMapTasks - job.finishedMaps());
  remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
  }
  }
  }
  //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。
  int maxMapLoad = 0;
  int maxReduceLoad = 0;
  if (numTaskTrackers > 0) {
  maxMapLoad = Math.min(maxCurrentMapTasks,
  (int) Math.ceil((double) remainingMapLoad /
  numTaskTrackers));
  maxReduceLoad = Math.min(maxCurrentReduceTasks,
  (int) Math.ceil((double) remainingReduceLoad
  / numTaskTrackers));
  }
  ……
  //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task
  if (numMaps < maxMapLoad) {
  int totalNeededMaps = 0;
  synchronized (jobQueue) {
  for (JobInProgress job : jobQueue) {
  if (job.getStatus().getRunState() != JobStatus.RUNNING) {
  continue;
  }
  Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
  taskTrackerManager.getNumberOfUniqueHosts());
  if (t != null) {
  return Collections.singletonList(t);
  }
  ……
  }
  }
  }
  //分配完map task,再分配reduce task
  if (numReduces < maxReduceLoad) {
  int totalNeededReduces = 0;
  synchronized (jobQueue) {
  for (JobInProgress job : jobQueue) {
  if (job.getStatus().getRunState() != JobStatus.RUNNING ||
  job.numReduceTasks == 0) {
  continue;
  }
  Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
  taskTrackerManager.getNumberOfUniqueHosts());
  if (t != null) {
  return Collections.singletonList(t);
  }
  ……
  }
  }
  }
  return null;
  }
  从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。
五、TaskTracker
  在向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
  private void addToTaskQueue(LaunchTaskAction action) {
  if (action.getTask().isMapTask()) {
  mapLauncher.addToTaskQueue(action);
  } else {
  reduceLauncher.addToTaskQueue(action);
  }
  }
  TaskLauncher是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):
  private void localizeJob(TaskInProgress tip) throws IOException {
  //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar
  Path localJarFile = null;
  Task t = tip.getTask();
  JobID jobId = t.getJobID();
  Path jobFile = new Path(t.getJobFile());
  ……
  Path localJobFile = lDirAlloc.getLocalPathForWrite(
  getLocalJobDir(jobId.toString())
  + Path.SEPARATOR + "job.xml",
  jobFileSize, fConf);
  RunningJob rjob = addTaskToJob(jobId, tip);
  synchronized (rjob) {
  if (!rjob.localized) {
  FileSystem localFs = FileSystem.getLocal(fConf);
  Path jobDir = localJobFile.getParent();
  ……
  //将job.split拷贝到本地
  systemFS.copyToLocalFile(jobFile, localJobFile);
  JobConf localJobConf = new JobConf(localJobFile);
  Path workDir = lDirAlloc.getLocalPathForWrite(
  (getLocalJobDir(jobId.toString())
  + Path.SEPARATOR + "work"), fConf);
  if (!localFs.mkdirs(workDir)) {
  throw new IOException("Mkdirs failed to create "
  + workDir.toString());
  }
  System.setProperty("job.local.dir", workDir.toString());
  localJobConf.set("job.local.dir", workDir.toString());
  // copy Jar file to the local FS and unjar it.
  String jarFile = localJobConf.getJar();
  long jarFileSize = -1;
  if (jarFile != null) {
  Path jarFilePath = new Path(jarFile);
  localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
  getLocalJobDir(jobId.toString())
  + Path.SEPARATOR + "jars",
  5 * jarFileSize, fConf), "job.jar");
  if (!localFs.mkdirs(localJarFile.getParent())) {
  throw new IOException("Mkdirs failed to create jars directory ");
  }
  //将job.jar拷贝到本地
  systemFS.copyToLocalFile(jarFilePath, localJarFile);
  localJobConf.setJar(localJarFile.toString());
  //将job得configuration写成job.xml
  OutputStream out = localFs.create(localJobFile);
  try {
  localJobConf.writeXml(out);
  } finally {
  out.close();
  }
  // 解压缩job.jar
  RunJar.unJar(new File(localJarFile.toString()),
  new File(localJarFile.getParent().toString()));
  }
  rjob.localized = true;
  rjob.jobConf = localJobConf;
  }
  }
  //真正的启动此Task
  launchTaskForJob(tip, new JobConf(rjob.jobConf));
  }
  当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:
  public synchronized void launchTask() throws IOException {
  ……
  //创建task运行目录
  localizeTask(task);
  if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
  this.taskStatus.setRunState(TaskStatus.State.RUNNING);
  }
  //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner
  this.runner = task.createRunner(TaskTracker.this, this);
  this.runner.start();
  this.taskStatus.setStartTime(System.currentTimeMillis());
  }
  TaskRunner是一个线程,其run函数如下:
  public final void run() {
  ……
  TaskAttemptID taskid = t.getTaskID();
  LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
  File jobCacheDir = null;
  if (conf.getJar() != null) {
  jobCacheDir = new File(
  new Path(conf.getJar()).getParent().toString());
  }
  File workDir = new File(lDirAlloc.getLocalPathToRead(
  TaskTracker.getLocalTaskDir(
  t.getJobID().toString(),
  t.getTaskID().toString(),
  t.isTaskCleanupTask())
  + Path.SEPARATOR + MRConstants.WORKDIR,
  conf). toString());
  FileSystem fileSystem;
  Path localPath;
  ……
  //拼写classpath
  String baseDir;
  String sep = System.getProperty("path.separator");
  StringBuffer classPath = new StringBuffer();
  // start with same classpath as parent process
  classPath.append(System.getProperty("java.class.path"));
  classPath.append(sep);
  if (!workDir.mkdirs()) {
  if (!workDir.isDirectory()) {
  LOG.fatal("Mkdirs failed to create " + workDir.toString());
  }
  }
  String jar = conf.getJar();
  if (jar != null) {      
  // if jar exists, it into workDir
  File[] libs = new File(jobCacheDir, "lib").listFiles();
  if (libs != null) {
  for (int i = 0; i < libs.length; i++) {
  classPath.append(sep);            // add libs from jar to classpath
  classPath.append(libs);
  }
  }
  classPath.append(sep);
  classPath.append(new File(jobCacheDir, "classes"));
  classPath.append(sep);
  classPath.append(jobCacheDir);
  }
  ……
  classPath.append(sep);
  classPath.append(workDir);
  //拼写命令行java及其参数
  Vector<String> vargs = new Vector<String>(8);
  File jvm =
  new File(new File(System.getProperty("java.home"), "bin"), "java");
  vargs.add(jvm.toString());
  String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
  javaOpts = javaOpts.replace("@taskid@", taskid.toString());
  String [] javaOptsSplit = javaOpts.split(" ");
  String libraryPath = System.getProperty("java.library.path");
  if (libraryPath == null) {
  libraryPath = workDir.getAbsolutePath();
  } else {
  libraryPath += sep + workDir;
  }
  boolean hasUserLDPath = false;
  for(int i=0; i<javaOptsSplit.length ;i++) {
  if(javaOptsSplit.startsWith("-Djava.library.path=")) {
  javaOptsSplit += sep + libraryPath;
  hasUserLDPath = true;
  break;
  }
  }
  if(!hasUserLDPath) {
  vargs.add("-Djava.library.path=" + libraryPath);
  }
  for (int i = 0; i < javaOptsSplit.length; i++) {
  vargs.add(javaOptsSplit);
  }
  //添加Child进程的临时文件夹
  String tmp = conf.get("mapred.child.tmp", "./tmp");
  Path tmpDir = new Path(tmp);
  if (!tmpDir.isAbsolute()) {
  tmpDir = new Path(workDir.toString(), tmp);
  }
  FileSystem localFs = FileSystem.getLocal(conf);
  if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
  throw new IOException("Mkdirs failed to create " + tmpDir.toString());
  }
  vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
  // Add classpath.
  vargs.add("-classpath");
  vargs.add(classPath.toString());
  //log文件夹
  long logSize = TaskLog.getTaskLogLength(conf);
  vargs.add("-Dhadoop.log.dir=" +
  new File(System.getProperty("hadoop.log.dir")
  ).getAbsolutePath());
  vargs.add("-Dhadoop.root.logger=INFO,TLA");
  vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
  vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
  // 运行map task和reduce task的子进程的main class是Child
  vargs.add(Child.class.getName());  // main of Child
  ……
  //运行子进程
  jvmManager.launchJvm(this,
  jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
  workDir, env, pidFile, conf));
  }
六、Child
  真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
  while (true) {
  //从TaskTracker通过网络通信得到JvmTask对象
  JvmTask myTask = umbilical.getTask(jvmId);
  ……
  idleLoopCount = 0;
  task = myTask.getTask();
  taskid = task.getTaskID();
  isCleanup = task.isTaskCleanupTask();
  JobConf job = new JobConf(task.getJobFile());
  TaskRunner.setupWorkDir(job);
  numTasksToExecute = job.getNumTasksToExecutePerJvm();
  task.setConf(job);
  defaultConf.addResource(new Path(task.getJobFile()));
  ……
  //运行task
  task.run(job, umbilical);             // run the task
  if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
  break;
  }
  }
6.1、MapTask
  如果task是MapTask,则其run函数如下:
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException {
  //用于同TaskTracker进行通信,汇报运行状况
  final Reporter reporter = getReporter(umbilical);
  startCommunicationThread(umbilical);
  initialize(job, reporter);
  ……
  //map task的输出
  int numReduceTasks = conf.getNumReduceTasks();
  MapOutputCollector collector = null;
  if (numReduceTasks > 0) {
  collector = new MapOutputBuffer(umbilical, job, reporter);
  } else {
  collector = new DirectMapOutputCollector(umbilical, job, reporter);
  }
  //读取input split,按照其中的信息,生成RecordReader来读取数据
  instantiatedSplit = (InputSplit)
  ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
  DataInputBuffer splitBuffer = new DataInputBuffer();
  splitBuffer.reset(split.getBytes(), 0, split.getLength());
  instantiatedSplit.readFields(splitBuffer);
  if (instantiatedSplit instanceof FileSplit) {
  FileSplit fileSplit = (FileSplit) instantiatedSplit;
  job.set("map.input.file", fileSplit.getPath().toString());
  job.setLong("map.input.start", fileSplit.getStart());
  job.setLong("map.input.length", fileSplit.getLength());
  }
  RecordReader rawIn =                  // open input
  job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
  RecordReader in = isSkipping() ?
  new SkippingRecordReader(rawIn, getCounters(), umbilical) :
  new TrackedRecordReader(rawIn, getCounters());
  job.setBoolean("mapred.skip.on", isSkipping());
  //对于map task,生成一个MapRunnable,默认是MapRunner
  MapRunnable runner =
  ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
  try {
  //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。
  runner.run(in, collector, reporter);     
  collector.flush();
  } finally {
  in.close();                               // close input
  collector.close();
  }
  done(umbilical);
  }
  MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:
  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
  Reporter reporter)
  throws IOException {
  try {
  K1 key = input.createKey();
  V1 value = input.createValue();
  while (input.next(key, value)) {
  mapper.map(key, value, output, reporter);
  if(incrProcCount) {
  reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
  SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
  }
  }
  } finally {
  mapper.close();
  }
  }
  结果集全部收集到MapOutputBuffer中,其collect函数如下:
  public synchronized void collect(K key, V value)
  throws IOException {
  reporter.progress();
  ……
  //从此处看,此buffer是一个ring的数据结构
  final int kvnext = (kvindex + 1) % kvoffsets.length;
  spillLock.lock();
  try {
  boolean kvfull;
  do {
  //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了
  kvfull = kvnext == kvstart;
  //在ring中计算是否需要将buffer写入硬盘的阈值
  final boolean kvsoftlimit = ((kvnext > kvend)
  ? kvnext - kvend > softRecordLimit
  : kvend - kvnext <= kvoffsets.length - softRecordLimit);
  //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。
  //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘
  if (kvstart == kvend && kvsoftlimit) {
  startSpill();
  }
  //如果buffer满了,则只能等待写入完毕
  if (kvfull) {
  while (kvstart != kvend) {
  reporter.progress();
  spillDone.await();
  }
  }
  } while (kvfull);
  } finally {
  spillLock.unlock();
  }
  try {
  //如果buffer不满,则将key, value写入buffer
  int keystart = bufindex;
  keySerializer.serialize(key);
  final int valstart = bufindex;
  valSerializer.serialize(value);
  int valend = bb.markRecord();
  //调用设定的partitioner,根据key, value取得partition id
  final int partition = partitioner.getPartition(key, value, partitions);
  mapOutputRecordCounter.increment(1);
  mapOutputByteCounter.increment(valend >= keystart
  ? valend - keystart
  : (bufvoid - keystart) + valend);
  //将parition id以及key, value在buffer中的偏移量写入索引数组
  int ind = kvindex * ACCTSIZE;
  kvoffsets[kvindex] = ind;
  kvindices[ind + PARTITION] = partition;
  kvindices[ind + KEYSTART] = keystart;
  kvindices[ind + VALSTART] = valstart;
  kvindex = kvnext;
  } catch (MapBufferTooSmallException e) {
  LOG.info("Record too large for in-memory buffer: " + e.getMessage());
  spillSingleRecord(key, value);
  mapOutputRecordCounter.increment(1);
  return;
  }
  }
  内存buffer的格式如下:
  (见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.iyunv.com/)
DSC0000.jpg

  kvoffsets是为了写入内存前排序使用的。
  从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:
  private void sortAndSpill() throws IOException {
  ……
  FSDataOutputStream out = null;
  FSDataOutputStream indexOut = null;
  IFileOutputStream indexChecksumOut = null;
  //创建硬盘上的spill文件
  Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  numSpills, size);
  out = rfs.create(filename);
  ……
  final int endPosition = (kvend > kvstart)
  ? kvend
  : kvoffsets.length + kvend;
  //按照partition的顺序对buffer中的数据进行排序
  sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
  int spindex = kvstart;
  InMemValBytes value = new InMemValBytes();
  //依次一个一个parition的写入文件
  for (int i = 0; i < partitions; ++i) {
  IFile.Writer<K, V> writer = null;
  long segmentStart = out.getPos();
  writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
  //如果combiner为空,则直接写入文件
  if (null == combinerClass) {
  ……
  writer.append(key, value);
  ++spindex;
  }
  else {
  ……
  //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件
  combineAndSpill(kvIter, combineInputCounter);
  }
  }
  ……
  }
  当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
  private void mergeParts() throws IOException {
  ……
  //对于每一个partition
  for (int parts = 0; parts < partitions; parts++){
  //create the segments to be merged
  List<Segment<K, V>> segmentList =
  new ArrayList<Segment<K, V>>(numSpills);
  TaskAttemptID mapId = getTaskID();
  //依次从各个spill文件中收集属于当前partition的段
  for(int i = 0; i < numSpills; i++) {
  final IndexRecord indexRecord =
  getIndexInformation(mapId, i, parts);
  long segmentOffset = indexRecord.startOffset;
  long segmentLength = indexRecord.partLength;
  Segment<K, V> s =
  new Segment<K, V>(job, rfs, filename, segmentOffset,
  segmentLength, codec, true);
  segmentList.add(i, s);
  }
  //将属于同一个partition的段merge到一起
  RawKeyValueIterator kvIter =
  Merger.merge(job, rfs,
  keyClass, valClass,
  segmentList, job.getInt("io.sort.factor", 100),
  new Path(getTaskID().toString()),
  job.getOutputKeyComparator(), reporter);
  //写入合并后的段到文件
  long segmentStart = finalOut.getPos();
  Writer<K, V> writer =
  new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
  if (null == combinerClass || numSpills < minSpillsForCombine) {
  Merger.writeFile(kvIter, writer, reporter, job);
  } else {
  combineCollector.setWriter(writer);
  combineAndSpill(kvIter, combineInputCounter);
  }
  ……
  }
  }
6.2、ReduceTask
  ReduceTask的run函数如下:
  public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException {
  job.setBoolean("mapred.skip.on", isSkipping());
  //对于reduce,则包含三个步骤:拷贝,排序,Reduce
  if (isMapOrReduce()) {
  copyPhase = getProgress().addPhase("copy");
  sortPhase  = getProgress().addPhase("sort");
  reducePhase = getProgress().addPhase("reduce");
  }
  startCommunicationThread(umbilical);
  final Reporter reporter = getReporter(umbilical);
  initialize(job, reporter);
  //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。
  boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
  if (!isLocal) {
  reduceCopier = new ReduceCopier(umbilical, job);
  if (!reduceCopier.fetchOutputs()) {
  ……
  }
  }
  copyPhase.complete();
  //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value
  setPhase(TaskStatus.Phase.SORT);
  statusUpdate(umbilical);
  final FileSystem rfs = FileSystem.getLocal(job).getRaw();
  RawKeyValueIterator rIter = isLocal
  ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
  job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
  !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
  new Path(getTaskID().toString()), job.getOutputKeyComparator(),
  reporter)
  : reduceCopier.createKVIterator(job, rfs, reporter);
  mapOutputFilesOnDisk.clear();
  sortPhase.complete();
  //reduce阶段
  setPhase(TaskStatus.Phase.REDUCE);
  ……
  Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
  Class keyClass = job.getMapOutputKeyClass();
  Class valClass = job.getMapOutputValueClass();
  ReduceValuesIterator values = isSkipping() ?
  new SkippingReduceValuesIterator(rIter,
  job.getOutputValueGroupingComparator(), keyClass, valClass,
  job, reporter, umbilical) :
  new ReduceValuesIterator(rIter,
  job.getOutputValueGroupingComparator(), keyClass, valClass,
  job, reporter);
  //逐个读出key-value list,然后调用Reducer的reduce函数
  while (values.more()) {
  reduceInputKeyCounter.increment(1);
  reducer.reduce(values.getKey(), values, collector, reporter);
  values.nextKey();
  values.informReduceProgress();
  }
  reducer.close();
  out.close(reporter);
  done(umbilical);
  }
七、总结
  Map-Reduce的过程总结如下图:
DSC0001.jpg

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313334-1-1.html 上篇帖子: 【转】用 Hadoop 进行分布式并行编程, 第 2 部分 下篇帖子: Hadoop学习总结Map-Reduce的过程解析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表