设为首页 收藏本站
查看: 446|回复: 0

[经验分享] hadoop系列四:mapreduce的使用(二)

[复制链接]

尚未签到

发表于 2017-12-17 12:52:31 | 显示全部楼层 |阅读模式
/**  
* Licensed to the Apache Software Foundation (ASF) under one
  
* or more contributor license agreements.  See the NOTICE file
  
* distributed with this work for additional information
  
* regarding copyright ownership.  The ASF licenses this file
  
* to you under the Apache License, Version 2.0 (the
  
* "License"); you may not use this file except in compliance
  
* with the License.  You may obtain a copy of the License at
  
*
  
*     
http://www.apache.org/licenses/LICENSE-2.0  
*
  
* Unless required by applicable law or agreed to in writing, software
  
* distributed under the License is distributed on an "AS IS" BASIS,
  
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  
* See the License for the specific language governing permissions and
  
* limitations under the License.
  

*/  

  
package org.apache.hadoop.mapred;
  

  
import java.io.IOException;
  
import java.nio.ByteBuffer;
  
import java.util.ArrayList;
  
import java.util.Collection;
  
import java.util.HashMap;
  
import java.util.HashSet;
  
import java.util.List;
  
import java.util.Map;
  
import java.util.Vector;
  

  
import org.apache.commons.logging.Log;
  
import org.apache.commons.logging.LogFactory;
  
import org.apache.hadoop.classification.InterfaceAudience.Private;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.fs.FileContext;
  
import org.apache.hadoop.fs.FileStatus;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.fs.UnsupportedFileSystemException;
  
import org.apache.hadoop.io.DataOutputBuffer;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.ipc.ProtocolSignature;
  
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
  
import org.apache.hadoop.mapreduce.ClusterMetrics;
  
import org.apache.hadoop.mapreduce.Counters;
  
import org.apache.hadoop.mapreduce.JobContext;
  
import org.apache.hadoop.mapreduce.JobID;
  
import org.apache.hadoop.mapreduce.JobStatus;
  
import org.apache.hadoop.mapreduce.MRJobConfig;
  
import org.apache.hadoop.mapreduce.QueueAclsInfo;
  
import org.apache.hadoop.mapreduce.QueueInfo;
  
import org.apache.hadoop.mapreduce.TaskAttemptID;
  
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
  
import org.apache.hadoop.mapreduce.TaskReport;
  
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
  
import org.apache.hadoop.mapreduce.TaskType;
  
import org.apache.hadoop.mapreduce.TypeConverter;
  
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
  
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
  
import org.apache.hadoop.mapreduce.v2.LogParams;
  
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
  
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
  
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
  
import org.apache.hadoop.mapreduce.v2.util.MRApps;
  
import org.apache.hadoop.security.Credentials;
  
import org.apache.hadoop.security.SecurityUtil;
  
import org.apache.hadoop.security.UserGroupInformation;
  
import org.apache.hadoop.security.authorize.AccessControlList;
  
import org.apache.hadoop.security.token.Token;
  
import org.apache.hadoop.yarn.api.ApplicationConstants;
  
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
  
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
  
import org.apache.hadoop.yarn.api.records.ApplicationId;
  
import org.apache.hadoop.yarn.api.records.ApplicationReport;
  
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
  
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
  
import org.apache.hadoop.yarn.api.records.LocalResource;
  
import org.apache.hadoop.yarn.api.records.LocalResourceType;
  
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
  
import org.apache.hadoop.yarn.api.records.ReservationId;
  
import org.apache.hadoop.yarn.api.records.Resource;
  
import org.apache.hadoop.yarn.api.records.URL;
  
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
  
import org.apache.hadoop.yarn.conf.YarnConfiguration;
  
import org.apache.hadoop.yarn.exceptions.YarnException;
  
import org.apache.hadoop.yarn.factories.RecordFactory;
  
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
  
import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
  
import org.apache.hadoop.yarn.util.ConverterUtils;
  

  
import com.google.common.annotations.VisibleForTesting;
  
import com.google.common.base.CaseFormat;
  

  
/**

  
* This>  
*/
  
@SuppressWarnings("unchecked")

  
public>  

  
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
  

  
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
  
private ResourceMgrDelegate resMgrDelegate;
  
private ClientCache clientCache;
  
private Configuration conf;
  
private final FileContext defaultFileContext;
  

  
/**
  
* Yarn runner incapsulates the client interface of yarn
  
*
  
* @param conf
  
*            the configuration object for the client
  
*/
  
public YARNRunner(Configuration conf) {
  
this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
  
}
  

  
/**
  
* Similar to {@link #YARNRunner(Configuration)} but allowing injecting
  
* {@link ResourceMgrDelegate}. Enables mocking and testing.
  
*
  
* @param conf
  
*            the configuration object for the client
  
* @param resMgrDelegate
  
*            the resourcemanager client handle.
  
*/
  
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
  
this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
  
}
  

  
/**
  
* Similar to
  
* {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} but
  
* allowing injecting {@link ClientCache}. Enable mocking and testing.
  
*
  
* @param conf
  
*            the configuration object
  
* @param resMgrDelegate
  
*            the resource manager delegate
  
* @param clientCache
  
*            the client cache object.
  
*/
  
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) {
  
this.conf = conf;
  
try {
  
this.resMgrDelegate = resMgrDelegate;
  
this.clientCache = clientCache;
  
this.defaultFileContext = FileContext.getFileContext(this.conf);
  
} catch (UnsupportedFileSystemException ufe) {
  
throw new RuntimeException("Error in instantiating YarnClient", ufe);
  
}
  
}
  

  
@Private
  
/**
  
* Used for testing mostly.
  
* @param resMgrDelegate the resource manager delegate to set to.
  
*/
  
public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
  
this.resMgrDelegate = resMgrDelegate;
  
}
  

  
@Override
  
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0) throws IOException, InterruptedException {
  
throw new UnsupportedOperationException("Use Token.renew instead");
  
}
  

  
@Override
  
public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
  
return resMgrDelegate.getActiveTrackers();
  
}
  

  
@Override
  
public JobStatus[] getAllJobs() throws IOException, InterruptedException {
  
return resMgrDelegate.getAllJobs();
  
}
  

  
@Override
  
public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
  
return resMgrDelegate.getBlacklistedTrackers();
  
}
  

  
@Override
  
public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
  
return resMgrDelegate.getClusterMetrics();
  
}
  

  
@VisibleForTesting
  
void addHistoryToken(Credentials ts) throws IOException, InterruptedException {
  
/* check if we have a hsproxy, if not, no need */
  
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
  
if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
  
/*
  
* note that get delegation token was called. Again this is hack for
  
* oozie to make sure we add history server delegation tokens to the
  
* credentials
  
*/
  
RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector();
  
Text service = resMgrDelegate.getRMDelegationTokenService();
  
if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
  
Text hsService = SecurityUtil.buildTokenService(hsProxy.getConnectAddress());
  
if (ts.getToken(hsService) == null) {
  
ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));
  
}
  
}
  
}
  
}
  

  
@VisibleForTesting
  
Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy) throws IOException, InterruptedException {
  
GetDelegationTokenRequest request = recordFactory.newRecordInstance(GetDelegationTokenRequest.class);
  
request.setRenewer(Master.getMasterPrincipal(conf));
  
org.apache.hadoop.yarn.api.records.Token mrDelegationToken;
  
mrDelegationToken = hsProxy.getDelegationToken(request).getDelegationToken();
  
return ConverterUtils.convertFromYarn(mrDelegationToken, hsProxy.getConnectAddress());
  
}
  

  
@Override
  
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, InterruptedException {
  
// The token is only used for serialization. So the type information
  
// mismatch should be fine.
  
return resMgrDelegate.getDelegationToken(renewer);
  
}
  

  
@Override
  
public String getFilesystemName() throws IOException, InterruptedException {
  
return resMgrDelegate.getFilesystemName();
  
}
  

  
@Override
  
public JobID getNewJobID() throws IOException, InterruptedException {
  
return resMgrDelegate.getNewJobID();
  
}
  

  
@Override
  
public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
  
return resMgrDelegate.getQueue(queueName);
  
}
  

  
@Override
  
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
  
return resMgrDelegate.getQueueAclsForCurrentUser();
  
}
  

  
@Override
  
public QueueInfo[] getQueues() throws IOException, InterruptedException {
  
return resMgrDelegate.getQueues();
  
}
  

  
@Override
  
public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
  
return resMgrDelegate.getRootQueues();
  
}
  

  
@Override
  
public QueueInfo[] getChildQueues(String parent) throws IOException, InterruptedException {
  
return resMgrDelegate.getChildQueues(parent);
  
}
  

  
@Override
  
public String getStagingAreaDir() throws IOException, InterruptedException {
  
return resMgrDelegate.getStagingAreaDir();
  
}
  

  
@Override
  
public String getSystemDir() throws IOException, InterruptedException {
  
return resMgrDelegate.getSystemDir();
  
}
  

  
@Override
  
public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
  
return resMgrDelegate.getTaskTrackerExpiryInterval();
  
}
  

  
@Override
  
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {
  

  
addHistoryToken(ts);
  

  
// Construct necessary information to start the MR AM
  
ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts);
  

  
// Submit to ResourceManager
  
try {
  
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
  

  
ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
  
String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics());
  
if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
  
throw new IOException("Failed to run job : " + diagnostics);
  
}
  
return clientCache.getClient(jobId).getJobStatus(jobId);
  
} catch (YarnException e) {
  
throw new IOException(e);
  
}
  
}
  

  
private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) throws IOException {
  
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
  
FileStatus rsrcStat = fs.getFileStatus(p);
  
rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
  
rsrc.setSize(rsrcStat.getLen());
  
rsrc.setTimestamp(rsrcStat.getModificationTime());
  
rsrc.setType(type);
  
rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
  
return rsrc;
  
}
  

  
public ApplicationSubmissionContext createApplicationSubmissionContext(Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException {
  
ApplicationId applicationId = resMgrDelegate.getApplicationId();
  

  
// Setup resource requirements
  
Resource capability = recordFactory.newRecordInstance(Resource.class);
  
capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
  
capability.setVirtualCores(conf.getInt(MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES));
  
LOG.debug("AppMaster capability = " + capability);
  

  
// Setup LocalResources
  
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
  

  
Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
  

  
URL yarnUrlForJobSubmitDir = ConverterUtils.getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem().resolvePath(defaultFileContext.makeQualified(new Path(jobSubmitDir))));
  
LOG.debug("Creating setup context, jobSubmitDir url is " + yarnUrlForJobSubmitDir);
  

  
localResources.put(MRJobConfig.JOB_CONF_FILE, createApplicationResource(defaultFileContext, jobConfPath, LocalResourceType.FILE));
  
if (jobConf.get(MRJobConfig.JAR) != null) {
  
Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
  
LocalResource rc = createApplicationResource(FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath, LocalResourceType.PATTERN);
  
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
  
rc.setPattern(pattern);
  
localResources.put(MRJobConfig.JOB_JAR, rc);
  
} else {
  
// Job jar may be null. For e.g, for pipes, the job jar is the
  
// hadoop

  
// mapreduce jar itself which is already on the>  
LOG.info("Job jar is not present. " + "Not adding any jar to the list of resources.");
  
}
  

  
// TODO gross hack
  
for (String s : new String[] { MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO }) {
  
localResources.put(MRJobConfig.JOB_SUBMIT_DIR + "/" + s, createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s), LocalResourceType.FILE));
  
}
  

  
// Setup security tokens
  
DataOutputBuffer dob = new DataOutputBuffer();
  
ts.writeTokenStorageToStream(dob);
  
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
  

  
// Setup the command to run the AM
  
List<String> vargs = new ArrayList<String>(8);
  
// vargs.add(MRApps.crossPlatformifyMREnv(jobConf,
  
// Environment.JAVA_HOME)
  
// + "/bin/java");
  
// TODO   此处为修改处
  
System.out.println(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME) + "/bin/java");
  
vargs.add("$JAVA_HOME/bin/java");
  

  
// TODO: why do we use 'conf' some places and 'jobConf' others?
  
long logSize = jobConf.getLong(MRJobConfig.MR_AM_LOG_KB, MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10;
  
String logLevel = jobConf.get(MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
  
int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
  
MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
  

  
// Check for Java Lib Path usage in MAP and REDUCE configs
  
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""), "map", MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
  
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""), "map", MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
  
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""), "reduce", MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
  
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""), "reduce", MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
  

  
// Add AM admin command opts before user command opts
  
// so that it can be overridden by user
  
String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
  
warnForJavaLibPath(mrAppMasterAdminOptions, "app master", MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
  
vargs.add(mrAppMasterAdminOptions);
  

  
// Add AM user command opts
  
String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
  
warnForJavaLibPath(mrAppMasterUserOptions, "app master", MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
  
vargs.add(mrAppMasterUserOptions);
  

  
if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE, MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
  
final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS, MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
  
if (profileParams != null) {
  
vargs.add(String.format(profileParams, ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + TaskLog.LogName.PROFILE));
  
}
  
}
  

  
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
  
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT);
  
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR);
  

  
Vector<String> vargsFinal = new Vector<String>(8);
  
// Final command
  
StringBuilder mergedCommand = new StringBuilder();
  
for (CharSequence str : vargs) {
  
mergedCommand.append(str).append(" ");
  
}
  
vargsFinal.add(mergedCommand.toString());
  

  
LOG.debug("Command to launch container for ApplicationMaster is : " + mergedCommand);
  


  
// Setup the>
  
// i.e. add { Hadoop jars, job jar, CWD } to>  
Map<String, String> environment = new HashMap<String, String>();
  
MRApps.setClasspath(environment, conf);
  

  
// Shell
  
        environment.put(Environment.SHELL.name(), conf.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
  

  
// Add the container working directory at the front of LD_LIBRARY_PATH
  
        MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(), MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
  

  
// Setup the environment variables for Admin first
  
        MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), conf);
  
// Setup the environment variables (LD_LIBRARY_PATH, etc)
  
        MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ENV), conf);
  

  
// Parse distributed cache
  
        MRApps.setupDistributedCache(jobConf, localResources);
  

  
Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(2);
  
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
  
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
  

  
// TODO BY DHT
  
for (String key : environment.keySet()) {
  
String org = environment.get(key);
  
String linux = getLinux(org);
  
environment.put(key, linux);
  
}
  
// Setup ContainerLaunchContext for AM container
  
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(localResources, environment, vargsFinal, null, securityTokens, acls);
  

  
Collection<String> tagsFromConf = jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);
  

  
// Set up the ApplicationSubmissionContext
  
ApplicationSubmissionContext appContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
  
appContext.setApplicationId(applicationId); // ApplicationId
  
appContext.setQueue( // Queue name
  
                jobConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME));
  
// add reservationID if present
  
ReservationId reservationID = null;
  
try {
  
reservationID = ReservationId.parseReservationId(jobConf.get(JobContext.RESERVATION_ID));
  
} catch (NumberFormatException e) {
  
// throw exception as reservationid as is invalid
  
String errMsg = "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID) + " specified for the app: " + applicationId;
  
LOG.warn(errMsg);
  
throw new IOException(errMsg);
  
}
  
if (reservationID != null) {
  
appContext.setReservationID(reservationID);
  
LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId + " to queue:" + appContext.getQueue() + " with reservationId:" + appContext.getReservationID());
  
}
  
appContext.setApplicationName( // Job name
  
                jobConf.get(JobContext.JOB_NAME, YarnConfiguration.DEFAULT_APPLICATION_NAME));
  
appContext.setCancelTokensWhenComplete(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
  
appContext.setAMContainerSpec(amContainer); // AM Container
  
        appContext.setMaxAppAttempts(conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
  
appContext.setResource(capability);
  
appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
  
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
  
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
  
}
  

  
return appContext;
  
}
  

  
/**
  
* 此处为修改处
  
* @param org
  
* @return
  
*/
  
private String getLinux(String org) {
  
StringBuilder sb = new StringBuilder();
  
int c = 0;
  
for (int i = 0; i < org.length(); i++) {
  
if (org.charAt(i) == '%') {
  
c++;
  
if (c % 2 == 1) {
  
sb.append("$");
  
}
  
} else {
  
switch (org.charAt(i)) {
  
case ';':
  
sb.append(":");
  
break;
  

  
case '\\':
  
sb.append("/");
  
break;
  
default:
  
sb.append(org.charAt(i));
  
break;
  
}
  
}
  
}
  
return (sb.toString());
  
}
  

  
@Override
  
public void setJobPriority(JobID arg0, String arg1) throws IOException, InterruptedException {
  
resMgrDelegate.setJobPriority(arg0, arg1);
  
}
  

  
@Override
  
public long getProtocolVersion(String arg0, long arg1) throws IOException {
  
return resMgrDelegate.getProtocolVersion(arg0, arg1);
  
}
  

  
@Override
  
public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0) throws IOException, InterruptedException {
  
throw new UnsupportedOperationException("Use Token.renew instead");
  
}
  

  
@Override
  
public Counters getJobCounters(JobID arg0) throws IOException, InterruptedException {
  
return clientCache.getClient(arg0).getJobCounters(arg0);
  
}
  

  
@Override
  
public String getJobHistoryDir() throws IOException, InterruptedException {
  
return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
  
}
  

  
@Override
  
public JobStatus getJobStatus(JobID jobID) throws IOException, InterruptedException {
  
JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
  
return status;
  
}
  

  
@Override
  
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) throws IOException, InterruptedException {
  
return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
  
}
  

  
@Override
  
public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException, InterruptedException {
  
return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
  
}
  

  
@Override
  
public TaskReport[] getTaskReports(JobID jobID, TaskType taskType) throws IOException, InterruptedException {
  
return clientCache.getClient(jobID).getTaskReports(jobID, taskType);
  
}
  

  
private void killUnFinishedApplication(ApplicationId appId) throws IOException {
  
ApplicationReport application = null;
  
try {
  
application = resMgrDelegate.getApplicationReport(appId);
  
} catch (YarnException e) {
  
throw new IOException(e);
  
}
  
if (application.getYarnApplicationState() == YarnApplicationState.FINISHED || application.getYarnApplicationState() == YarnApplicationState.FAILED || application.getYarnApplicationState() == YarnApplicationState.KILLED) {
  
return;
  
}
  
killApplication(appId);
  
}
  

  
private void killApplication(ApplicationId appId) throws IOException {
  
try {
  
resMgrDelegate.killApplication(appId);
  
} catch (YarnException e) {
  
throw new IOException(e);
  
}
  
}
  

  
private boolean isJobInTerminalState(JobStatus status) {
  
return status.getState() == JobStatus.State.KILLED || status.getState() == JobStatus.State.FAILED || status.getState() == JobStatus.State.SUCCEEDED;
  
}
  

  
@Override
  
public void killJob(JobID arg0) throws IOException, InterruptedException {
  
/* check if the status is not running, if not send kill to RM */
  
JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
  
ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
  

  
// get status from RM and return
  
if (status == null) {
  
killUnFinishedApplication(appId);
  
return;
  
}
  

  
if (status.getState() != JobStatus.State.RUNNING) {
  
killApplication(appId);
  
return;
  
}
  

  
try {
  
/* send a kill to the AM */
  
clientCache.getClient(arg0).killJob(arg0);
  
long currentTimeMillis = System.currentTimeMillis();
  
long timeKillIssued = currentTimeMillis;
  
while ((currentTimeMillis < timeKillIssued + 10000L) && !isJobInTerminalState(status)) {
  
try {
  
Thread.sleep(1000L);
  
} catch (InterruptedException ie) {
  
/** interrupted, just break */
  
break;
  
}
  
currentTimeMillis = System.currentTimeMillis();
  
status = clientCache.getClient(arg0).getJobStatus(arg0);
  
if (status == null) {
  
killUnFinishedApplication(appId);
  
return;
  
}
  
}
  
} catch (IOException io) {
  
LOG.debug("Error when checking for application status", io);
  
}
  
if (status != null && !isJobInTerminalState(status)) {
  
killApplication(appId);
  
}
  
}
  

  
@Override
  
public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException, InterruptedException {
  
return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
  
}
  

  
@Override
  
public AccessControlList getQueueAdmins(String arg0) throws IOException {
  
return new AccessControlList("*");
  
}
  

  
@Override
  
public JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
  
return JobTrackerStatus.RUNNING;
  
}
  

  
@Override
  
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
  
return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
  
}
  

  
@Override
  
public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException {
  
return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
  
}
  

  
private static void warnForJavaLibPath(String opts, String component, String javaConf, String envConf) {
  
if (opts != null && opts.contains("-Djava.library.path")) {
  
LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " + "programs to no longer function if hadoop native libraries " + "are used. These values should be set as part of the " + "LD_LIBRARY_PATH in the " + component + " JVM env using " + envConf
  
+ " config settings.");
  
}
  
}
  
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425009-1-1.html 上篇帖子: Hadoop创建新用户 下篇帖子: hadoop常用命令,脚本分析,服务启动,系统配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表