设为首页 收藏本站
查看: 494|回复: 0

[经验分享] 大数据框架hadoop的作业提交过程

[复制链接]

尚未签到

发表于 2016-12-12 08:45:47 | 显示全部楼层 |阅读模式
    作业提交过程比较简单,它主要为后续作业执行准备环境,主要涉及创建目录、上传文件等操作;而一旦用户提交作业后,JobTracker端便会对作业进行初始化。作业初始化的主要工作是根据输入数据量和作业配置参数将作业分解成若干个Map Task以及Reduce Task,并添加到相关数据结构中,以等待后续被高度执行。总之,可将作业提交与初始化过程分为四个步骤,如下所示:    步骤一:用户使用Hadoop提供的Shell命令提交作业。
    步骤二:JobClient按照作业配置信息(JobConf)将作业运行需要的全部文件上传到JobTracker文件系统(通常为HDFS)的某个目录下。
步骤三:JobClient调用RPC接口向JobTracker提交作业。
步骤四:JobTracker接收到作业后,将其告知TaskScheduler,由TaskScheduler对作业进行初始化。

作业提交过程详解
1.1 执行Shell命令
Shell示例如下:
$HADOOP_HOME/bin/hadoop jar example.jar \
-D mapred.job.name=example \
-D mapred.reduce.tasks=2 \
-files=blacklist.txt,whitelist.txt \
-libjars=third-party.jar \
-archives=dictionary.zip \
-input /test/input \
-output /test/output
当用户输入以上命令后,hadoop脚本根据“jar”命令将作业交给RunJar类处理,相关代码如下:
elif [ "$COMMAND" = "jar" ] ; then
  CLASS=org.apache.hadoop.util.RunJar
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
用户的MapReduce程序已经配置好了作业运行时需要的各种信息(如Mapper类,Reducer类,Reduce Task个数等),它最终在main函数中调用JobClient.runJob函数(新MapReduce API则使用job.waitForCompletion(true)函数)提交作业,这之后会依次经过下表的调用顺序才会将作业提交到JobTracker端。
调用顺序


方法

1

JobClient

runJob(JobConf job)
2

submitJob(JobConf job)
3

submitJobInternal(final JobConf job)

4

JobTracker

submitJob(JobID jobId, String jobSubmitDir, Credentials ts)

 
1.2 作业文件上传
JobClient将作业提交到JobTracker端之前,需要进行一些初始化工作,包括:获取作业ID,创建HDFS目录,上传作业文件以及生成Split文件等。这些工作由函数JobClient.submitJobInternal(job)实现,具体流程见下表:
序号

调用方法

1

JobTracker.getNewJob()
2

HDFS.mkdirs()
3

HDFS.copyRemoteFiles()
4

HDFS.writeSplits()
5

HDFS.writeXml()
6

JobTracker.submitJob(job)
HDFS.mkdirs()方法相关代码如下:
public class JobTracker {

... ...

// 获取需要创建的工作目录

private String getStagingAreaDirInternal(String user)  {

final Path stagingRootDir =

      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",

            "/tmp/hadoop/mapred/staging"));

    final FileSystem fs = stagingRootDir.getFileSystem(conf);

    return fs.makeQualified(new Path(stagingRootDir,

                              user+"/.staging")).toString();

}

... ...

}

public class JobClient {

... ...

// 创建工作目录

private void copyAndConfigureFiles(JobConf job, Path submitJobDir

      short replication)
... ...
FileSystem.mkdirs(fssubmitJobDirmapredSysPerms);
... ...
}
... ...
}
HDFS.copyRemoteFiles()方法相关代码如下:
public class JobClient {

... ...

// 将客户端的相关文件拷贝到HDFS

private void copyAndConfigureFiles(JobConf job, Path submitJobDirshort replication)

  ... ...

  String files = job.get("tmpfiles");

  String libjars = job.get("tmpjars");

  String archives = job.get("tmparchives");
  if (files != null) {
... ...
// copies a file to the jobtracker filesystem and returns the path where it was copied to
Path newPath = copyRemoteFiles(fs,filesDirtmpjobreplication);

... ...

  }
  if (libjars != null) {
... ...
    Path newPath = copyRemoteFiles(fslibjarsDirtmpjobreplication);
... ...
  }
  if (archives != null) {
... ...
    Path newPath = copyRemoteFiles(fsarchivesDirtmpjobreplication);
... ...
  }
}
... ...
}
 
MapReduce作业文件的上传与下载是由DistributedCache工具完成的。这是Hadoop为方便用户进行应用程序开发而设计的数据分发工具。其整个工作流程对用户而言是透明的,也就是说,用户只需在提交作业时指定文件位置,至于这些文件的分发(需广播到各个TaskTracker上以运行Task),完全由DistricutedCache工具完成,不需要用户参与。
通常而言,对于一个典型的Java MapReduce作业,可能包含以下资源。
程序jar包:用户用Java编写的MapReduce应用程序jar包。
作业配置文件:描述MapReduce应用程序的配置信息
依赖的第三方jar包:应用程序依赖的第三方jar包,提交作业时用参数“-libjars”指定。
依赖的归档文件:应用程序中用到多个文件,可直接打包成归档文件,提交作业时用参数“-archives”指定。
依赖的普通文件:应用程序中可能用到普通文件,比如文本格式的字典文件,提交作业时用参数“-files”指定。
上述所有文件在JobClient端被提交到HDSF上,涉及的父目录如下表所示:
作业属性

属性值

说明

mapreduce.jobtracker.staging.root.dir
${hadoop.tmp.dir}/mapred/staging
HDFS上作业文件的上传目录,由管理员配置
mapreduce.job.dir
${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/${jobId}
用户${user}的作业${jobId}相关文件存放目录
文件上传完毕后,会将这些目录信息保存到作业配置对象JobConf中,其对应的作业属性如下表所示:
作业属性

说明

mapred.cache.files
作业依赖的普通文件在HDFS上的存放路径
mapred.job.classpath.archives
作业依赖的jar包在HDFS上的存放路径
mapred.cache.archives
作业依赖的压缩文件在HDFS上的存放路径
mapreduce.job.cache.files.visibilities
作业依赖的普通文件的可见性。如果是public可见性,则为true,否则为false
mapreduce.job.cache.archives.visibilities
作业依赖的归档文件的可见性。如果是public级别的可见性,则为true,否则为false
mapred.cache.files.timestamps
作业依赖的普通文件的最后一次修改时间的时间戳
mapred.cache.archives.timestamps
作业依赖的压缩文件的最后一次修改时间的时间戳
mapred.cache.files.filesizes
作业依赖的普通文件的大小
mapred.cache.archives.filesizes
作业依赖的归档文件的大小
mapred.jar
用户应用程序jar路径
 
    作业文件上传到HDFS后,可能会有大量节点同时从HDFS上下载这些文件,进而产生文件访问热点现象,造成性能瓶颈。为此,JobClient上传这些文件时会调高它们的副本数(由参数mapred.submit.replication指定,默认是10)以通过分摊负载方式避免产生访问热点。
1.3 产生InputSplit文件
用户提交MapReduce作业后,JobClient会调用InputFormatgetSplits方法生成InputSplit相关信息。该信息包括两部分:InputSplit元数据信息和原始InputSplit信息。其中,第一部分将被JobTracker使用,用以生成Task本地性相关的数据结构;而第二部分则将被Map Task初始化时使用,用以获取自己要处理的数据。相关代码如下:

public interface InputFormat<K, V> {

  /** 

   * Logically split the set of input files for the job.  

   */

  InputSplit[] getSplits(JobConf jobint numSplitsthrows IOException;

  RecordReader<K, V> getRecordReader(InputSplit split,JobConf job Reporter      reporterthrows IOException;

}

 

这两部分信息分别被保存到目录${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/${JobId}下的文件job.splitjob.splitmetainfo中。

InputSplit相关操作放在包org.apache.hadoop.mapreduce.split中,主要包含三个类JobSplitJobSplitWriterSplitMetaInfoReader
JobSplit封装了读写InputSplit相关的基础类,主要包括以下三个。
SplitMetaInfo:描述一个InputSplit的元数据信息,包括以下三项内容:
private long startOffset; //该InputSplit元信息在job.split文件中的偏移量
private long inputDataLength; //该InputSplit的数据长度
private String[] locations;    // 该InputSplit所在的host列表
所有InputSplit对应的SplitMetaInfo将被保存到文件job.splitmetainfo中。该文件内容组织方式如下表所示,内容依次为:一个用于标识InputSplit元数据文件头的字符串“META-SP,文件版本号splitVersion,作业对应的InputSplit数目length,最后是lengthInputSplit对应的SplitMetaInfo信息。
META-SP

splitVersion

length

SplitMetaInfo

SplitMetaInfo

... ...

 
TaskSplitMetaInfo:用于保存InputSplit元信息的数据结构,包括以下三项内容:
pivate TaskSplitIndex splitIndex; //Split元信息在job.split文件中的位置
private long inputDataLength;    //InputSplit的数据长度
private String[] locations;        //InputSplit所在的host列表
    这些信息是在作业初始化时,JobTracker从文件job.splitmetainfo中获取的。其中,host列表信息是任务调度器判断任务是否具有本地性的最重要因素,而splitIndex信息保存了新任务需处理的数据位置信息在文件job.split中的索引,TaskTracker收到信息后,便可以从job.split文件中读取InputSplit信息,进而运行一个新任务。
 
TaskSplitIndex:JobTrackerTaskTracker分配新任务时,TaskSplitIndex用于指定新任务待处理数据位置信息在文件job.split中的索引,主要包括两项内容:
private String splitLocation; //job.split文件的位置(目录)
private long startOffset;    // InputSplit信息在job.split文件中的位置
相关代码如下:
public class JobSplit {

   ... ...  

  /**

   * This represents the meta information about the task split.

   */

  public static class SplitMetaInfo implements Writable {

    private long startOffset;

    private long inputDataLength;

    private String[] locations;

  }

  /**

   * This represents the meta information about the task split that the 

   * JobTracker creates

   */

  public static class TaskSplitMetaInfo {

    private TaskSplitIndex splitIndex;

    private long inputDataLength;

    private String[] locations;

  }

  /**

   * This represents the meta information about the task split that the 

   * task gets

   */

  public static class TaskSplitIndex {

    private String splitLocation;

    private long startOffset;

  }

}
1.4 作业提交到JobTracker
JobClient最终调用RPC方法submitJob将作业提交到JobTracker端,在JobTracker.submitJob中,会依次进行以下操作:
1) 为作业创建JobInProgress对象
JobTracker会为每个作业创建一个JobInProgress对象。该对象维护了作业的运行时信息。它在作业运行过程中一直存在,主要用于跟踪正在运行作业的运行状态和进度。相关代码如下:
job = new JobInProgress(thisthis.confjobInfo, 0, ts);
2) 检查用户是否具有指定队列的作业提交权限
    Hadoop以队列为单位管理作业和资源,每个队列分配有一定量的资源,每个用户属于一个或者多个队列且只能使用所属队列中的资源。管理员可为每个队列指定哪些用户具有作业提交权限和管理权限。相关代码如下:
 aclsManager.checkAccess(jobugi, Operation.SUBMIT_JOB);
3) 检查作业配置的内存使用量是否合理
用户提交作业时,可分别用参数mapred.job.map.memory.mbmapred.job.reduce.memory.mb指定Map TaskReduce Task占用的内存量;而管理员可通过参数mapred.cluster.max.map.memory.mbmapred.cluster.max.reduce.memory.mb限制用户配置的任务最大内存使用量,一旦用户配置的内存使用量超过系统限制,则作业提交失败。相关代码如下:
// Check the job if it cannot run in the cluster because of invalid memory

// requirements.

try {

    checkMemoryRequirements(job);

catch (IOException ioe) {

    throw ioe;

}
4) 通知TaskScheduler初始化作业
JobTracker收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业初始化。相关代码如下:
synchronized (taskScheduler) {

        jobs.put(job.getProfile().getJobID(), job);

        for (JobInProgressListener listener : jobInProgressListeners) {

          listener.jobAdded(job);

        }

}

之所以不选择JobTracker而让调度器初始化,主要考虑到以下两个原因:
作业一旦初始化便会占用一定量的内存资源,为了防止大量初始化的作业排队等待调度而占用大量不必要的内存资源,Hadoop按照一定的策略选择性地初始化作业以节省内存资源;
任务调度器的职责是根据每个节点的资源使用情况对其分配最合适的任务,而只有经过初始化的作业才有可能得到调度,因而将作业初始化策略嵌到调度器中是一种编辑部合理的设计。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312997-1-1.html 上篇帖子: (8)基于hadoop的简单网盘应用实现4 下篇帖子: Hadoop 新特性、改进、优化和Bug分析系列5
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表