// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);
// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true); 基本结构:
Job类在org.apache.hadoop.mapreduce包中,继承了JobContextImpl类以及实现了JobContext接口。
Job定义的静态常量:
private static final Log LOG = LogFactory.getLog(Job.class);
@InterfaceStability.Evolving
public static enum JobState {DEFINE, RUNNING};
private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
/** Key in mapred-*.xml that sets completionPollInvervalMillis */
public static final String COMPLETION_POLL_INTERVAL_KEY =
"mapreduce.client.completion.pollinterval";
/** Default completionPollIntervalMillis is 5000 ms. */
static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
/** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
"mapreduce.client.progressmonitor.pollinterval";
/** Default progMonitorPollIntervalMillis is 1000 ms. */
static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
public static final String USED_GENERIC_PARSER =
"mapreduce.client.genericoptionsparser.used";
public static final String SUBMIT_REPLICATION =
"mapreduce.client.submit.file.replication";
private static final String TASKLOG_PULL_TIMEOUT_KEY =
"mapreduce.client.tasklog.timeout";
private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
Job定义的私有变量:
private JobState state = JobState.DEFINE;
private JobStatus status;
private long statustime;
private Cluster cluster;
Job类加载的时候就要执行的加载配置文件的方法:
/**
* Creates a new {@link Job} with no particular {@link Cluster} .
* A Cluster will be created with a generic {@link Configuration}.
*
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance() throws IOException {
// create with a null Cluster
return getInstance(new Configuration());
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and a
* given {@link Configuration}.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* A Cluster will be created from the conf parameter only when it's needed.
*
* @param conf the configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(Configuration conf) throws IOException {
// create with a null Cluster
JobConf jobConf = new JobConf(conf);
return new Job(jobConf);
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param conf the configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(Configuration conf, String jobName)
throws IOException {
// create with a null Cluster
Job result = getInstance(conf);
result.setJobName(jobName);
return result;
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(JobStatus status, Configuration conf)
throws IOException {
return new Job(status, new JobConf(conf));
}
/**
* Creates a new {@link Job} with no particular {@link Cluster}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance()}
*/
@Deprecated
public static Job getInstance(Cluster ignored) throws IOException {
return getInstance();
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance(Configuration)}
*/
@Deprecated
public static Job getInstance(Cluster ignored, Configuration conf)
throws IOException {
return getInstance(conf);
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The Job makes a copy of the Configuration so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param cluster cluster
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
@Private
public static Job getInstance(Cluster cluster, JobStatus status,
Configuration conf) throws IOException {
Job job = getInstance(status, conf);
job.setCluster(cluster);
return job;
}
可见通过这种方式获取Job实例的时候会有可能涉及到Cluster。
轮询周期的方法:
/** The interval at which monitorAndPrintJob() prints status */
public static int getProgressPollInterval(Configuration conf) {
// Read progress monitor poll interval from config. Default is 1 second.
int progMonitorPollIntervalMillis = conf.getInt(
PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
if (progMonitorPollIntervalMillis < 1) {
LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
" has been set to an invalid value; "
+ " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
}
return progMonitorPollIntervalMillis;
}
/** The interval at which waitForCompletion() should check. */
public static int getCompletionPollInterval(Configuration conf) {
int completionPollIntervalMillis = conf.getInt(
COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
if (completionPollIntervalMillis < 1) {
LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
" has been set to an invalid value; "
+ "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
}
return completionPollIntervalMillis;
}
上面两个方法分别为获取并且打印Job的运行状态的周期,以及查看Job是否完成的周期。
/**
* Set the number of reduce tasks for the job.
* @param tasks the number of reduce tasks
* @throws IllegalStateException if the job is submitted
*/
public void setNumReduceTasks(int tasks) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setNumReduceTasks(tasks);
}
/**
* Set the current working directory for the default file system.
*
* @param dir the new current working directory.
* @throws IllegalStateException if the job is submitted
*/
public void setWorkingDirectory(Path dir) throws IOException {
ensureState(JobState.DEFINE);
conf.setWorkingDirectory(dir);
}
/**
* Set the {@link InputFormat} for the job.
* @param cls the InputFormat to use
* @throws IllegalStateException if the job is submitted
*/
public void setInputFormatClass(Class cls) {
ensureState(JobState.DEFINE);
conf.setJarByClass(cls);
}
/**
* Set the job jar
*/
public void setJar(String jar) {
ensureState(JobState.DEFINE);
conf.setJar(jar);
}
/**
* Set the reported username for this job.
*
* @param user the username for this job.
*/
public void setUser(String user) {
ensureState(JobState.DEFINE);
conf.setUser(user);
}
/**
* Set the combiner class for the job.
* @param cls the combiner to use
* @throws IllegalStateException if the job is submitted
*/
public void setCombinerClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputKeyClass(theClass);
}
/**
* Set the value class for the map output data. This allows the user to
* specify the map output value class to be different than the final output
* value class.
*
* @param theClass the map output value class.
* @throws IllegalStateException if the job is submitted
*/
public void setMapOutputValueClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputValueClass(theClass);
}
/**
* Set the key class for the job output data.
*
* @param theClass the key class for the job output data.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputKeyClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputKeyClass(theClass);
}
/**
* Set the value class for job outputs.
*
* @param theClass the value class for job outputs.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputValueClass(Class theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueClass(theClass);
}
/**
* Define the comparator that controls how the keys are sorted before they
* are passed to the {@link Reducer}.
* @param cls the raw comparator
* @throws IllegalStateException if the job is submitted
*/
public void setSortComparatorClass(Class