设为首页 收藏本站
查看: 473|回复: 0

[经验分享] Hadoop 2.2.0 Job源代码阅读笔记

[复制链接]

尚未签到

发表于 2015-7-12 10:08:40 | 显示全部楼层 |阅读模式
  本文所有涉及的内容均为2.2.0版本中呈现。
    概述:
  Job在创建Job并且提交的人的眼中,可以在创建的时候通过配置Job的内容,控制Job的执行,以及查询Job的运行状态。一旦Job提交以后,将不能对其进行配置,否则将会出现IllegalStateException异常。
  正常情况下用户通过Job类来创建、描述、提交Job,以及监控Job的处理过程。下面是一个简单的例子:  



// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);
// Specify various job-specific parameters     
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
    基本结构:  
  Job类在org.apache.hadoop.mapreduce包中,继承了JobContextImpl类以及实现了JobContext接口。
    Job定义的静态常量:  



private static final Log LOG = LogFactory.getLog(Job.class);
@InterfaceStability.Evolving
public static enum JobState {DEFINE, RUNNING};
private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
/** Key in mapred-*.xml that sets completionPollInvervalMillis */
public static final String COMPLETION_POLL_INTERVAL_KEY =
"mapreduce.client.completion.pollinterval";
/** Default completionPollIntervalMillis is 5000 ms. */
static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
/** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
"mapreduce.client.progressmonitor.pollinterval";
/** Default progMonitorPollIntervalMillis is 1000 ms. */
static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
public static final String USED_GENERIC_PARSER =
"mapreduce.client.genericoptionsparser.used";
public static final String SUBMIT_REPLICATION =
"mapreduce.client.submit.file.replication";
private static final String TASKLOG_PULL_TIMEOUT_KEY =
"mapreduce.client.tasklog.timeout";
private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
  Job定义的私有变量:  



private JobState state = JobState.DEFINE;
private JobStatus status;
private long statustime;
private Cluster cluster;
  Job类加载的时候就要执行的加载配置文件的方法:  



static {
ConfigUtil.loadResources();
}
  加载的配置文件包括mapred-default.xml、mapred-site.xml、yarn-default.xml、yarn-site.xml。
  
  Job的构造函数:  



  @Deprecated
public Job() throws IOException {
this(new Configuration());
}
@Deprecated
public Job(Configuration conf) throws IOException {
this(new JobConf(conf));
}
@Deprecated
public Job(Configuration conf, String jobName) throws IOException {
this(conf);
setJobName(jobName);
}
Job(JobConf conf) throws IOException {
super(conf, null);
// propagate existing user credentials to job
this.credentials.mergeAll(this.ugi.getCredentials());
this.cluster = null;
}
Job(JobStatus status, JobConf conf) throws IOException {
this(conf);
setJobID(status.getJobID());
this.status = status;
state = JobState.RUNNING;
}
  可以注意到Hadoop不鼓励通过缺省的构造函数和通过Configuration类来构造Job对象。通过JobConf对象来构建Job是一个不错的选择。
  
  获取Job对象的实例化方法:
  除了通过构造函数,Job类中还提供了通过一些静态方法来获取Job的事例对象,看一下具体定义:    



/**
* Creates a new {@link Job} with no particular {@link Cluster} .
* A Cluster will be created with a generic {@link Configuration}.
*
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance() throws IOException {
// create with a null Cluster
return getInstance(new Configuration());
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and a
* given {@link Configuration}.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* A Cluster will be created from the conf parameter only when it's needed.
*
* @param conf the configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(Configuration conf) throws IOException {
// create with a null Cluster
JobConf jobConf = new JobConf(conf);
return new Job(jobConf);
}

/**
* Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param conf the configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(Configuration conf, String jobName)
throws IOException {
// create with a null Cluster
Job result = getInstance(conf);
result.setJobName(jobName);
return result;
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(JobStatus status, Configuration conf)
throws IOException {
return new Job(status, new JobConf(conf));
}
/**
* Creates a new {@link Job} with no particular {@link Cluster}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance()}
*/
@Deprecated
public static Job getInstance(Cluster ignored) throws IOException {
return getInstance();
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance(Configuration)}
*/
@Deprecated
public static Job getInstance(Cluster ignored, Configuration conf)
throws IOException {
return getInstance(conf);
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param cluster cluster
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
@Private
public static Job getInstance(Cluster cluster, JobStatus status,
Configuration conf) throws IOException {
Job job = getInstance(status, conf);
job.setCluster(cluster);
return job;
}
  可见通过这种方式获取Job实例的时候会有可能涉及到Cluster。
  
  轮询周期的方法:    



/** The interval at which monitorAndPrintJob() prints status */
public static int getProgressPollInterval(Configuration conf) {
// Read progress monitor poll interval from config. Default is 1 second.
int progMonitorPollIntervalMillis = conf.getInt(
PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
if (progMonitorPollIntervalMillis < 1) {
LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
" has been set to an invalid value; "
+ " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
}
return progMonitorPollIntervalMillis;
}
/** The interval at which waitForCompletion() should check. */
public static int getCompletionPollInterval(Configuration conf) {
int completionPollIntervalMillis = conf.getInt(
COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
if (completionPollIntervalMillis < 1) {
LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
" has been set to an invalid value; "
+ "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
}
return completionPollIntervalMillis;
}
  上面两个方法分别为获取并且打印Job的运行状态的周期,以及查看Job是否完成的周期。
  
  需要做异步处理的方法:    



synchronized void ensureFreshStatus()
throws IOException {
if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
updateStatus();
}
}

/** Some methods need to update status immediately. So, refresh
* immediately
* @throws IOException
*/
synchronized void updateStatus() throws IOException {
try {
this.status = ugi.doAs(new PrivilegedExceptionAction() {
@Override
public JobStatus run() throws IOException, InterruptedException {
return cluster.getClient().getJobStatus(status.getJobID());
}
});
}
catch (InterruptedException ie) {
throw new IOException(ie);
}
if (this.status == null) {
throw new IOException("Job status not available ");
}
this.statustime = System.currentTimeMillis();
}

private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
  
      设置配置参数的方法:
  



/**
* Set the number of reduce tasks for the job.
* @param tasks the number of reduce tasks
* @throws IllegalStateException if the job is submitted
*/
public void setNumReduceTasks(int tasks) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setNumReduceTasks(tasks);
}
/**
* Set the current working directory for the default file system.
*
* @param dir the new current working directory.
* @throws IllegalStateException if the job is submitted
*/
public void setWorkingDirectory(Path dir) throws IOException {
ensureState(JobState.DEFINE);
conf.setWorkingDirectory(dir);
}
/**
* Set the {@link InputFormat} for the job.
* @param cls the InputFormat to use
* @throws IllegalStateException if the job is submitted
*/
public void setInputFormatClass(Class cls) {
ensureState(JobState.DEFINE);
conf.setJarByClass(cls);
}
/**
* Set the job jar
*/
public void setJar(String jar) {
ensureState(JobState.DEFINE);
conf.setJar(jar);
}
/**
* Set the reported username for this job.
*
* @param user the username for this job.
*/
public void setUser(String user) {
ensureState(JobState.DEFINE);
conf.setUser(user);
}
/**
* Set the combiner class for the job.
* @param cls the combiner to use
* @throws IllegalStateException if the job is submitted
*/
public void setCombinerClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputKeyClass(theClass);
}
/**
* Set the value class for the map output data. This allows the user to
* specify the map output value class to be different than the final output
* value class.
*
* @param theClass the map output value class.
* @throws IllegalStateException if the job is submitted
*/
public void setMapOutputValueClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputValueClass(theClass);
}
/**
* Set the key class for the job output data.
*
* @param theClass the key class for the job output data.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputKeyClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputKeyClass(theClass);
}
/**
* Set the value class for job outputs.
*
* @param theClass the value class for job outputs.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputValueClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueClass(theClass);
}
/**
* Define the comparator that controls how the keys are sorted before they
* are passed to the {@link Reducer}.
* @param cls the raw comparator
* @throws IllegalStateException if the job is submitted
*/
public void setSortComparatorClass(Class

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85707-1-1.html 上篇帖子: hadoop 2.2.0的datanode中存储block的多个文件夹的负载均衡问题 下篇帖子: Hadoop中Writable类之四
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表