设为首页 收藏本站
查看: 1830|回复: 0

[经验分享] 第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode分析-namenode启动过程分析

[复制链接]

尚未签到

发表于 2016-12-13 10:54:46 | 显示全部楼层 |阅读模式
第七章:小朱笔记hadoop之源码分析-hdfs分析

第四节:namenode分析

4.1 namenode启动过程分析

       org.apache.hadoop.hdfs.server.namenode.main 方法是系统的入口,它会调用 createNameNode 创建 NameNode 实例。 createNameNode 分析命令行参数,如果是 FORMAT 戒 FINALIZE,调用对应的方法后退出,如果是其他的参数,将创建NameNode 对象。NameNode的构造函数会调initialize,初始化NameNode的成员发量,包括创建 RPC 服务器,初始化FSNamesystem,初始化RPC服务器和回收站线程。
   创建的服务如下:
  

服务                             类                           
server                   ipc.RPC.Server      
serviceRpcServer         ipc.RPC.Server  
HttpServer               http.HttpServer  
Trash Emptier            fs.Trash.Trash.Emptier  
hbthread                 hdfs.server.namenode.FSNamesystem.HeartbeatMonitor   
lmthread                 hdfs.server.namenode.LeaseManager.Monitor  
replthread               hdfs.server.namenode.FSNamesystem.ReplicationMonitor  
dnthread                 hdfs.server.namenode.DecommissionManager.Monitor
 初始化 name-node 入口:

/**
* Initialize name-node.
*  
* @param conf the configuration
*/  
private void initialize(Configuration conf) throws IOException {  
InetSocketAddress socAddr = NameNode.getAddress(conf);  
//从配置conf中获取到Namenode服务器所使用的Socket地址 读取fs.default.name的值,获取hdfs集群的地址   
UserGroupInformation.setConfiguration(conf);  
//设置用户权限信息  
//如果dfs.namenode.keytab.file存在,并且kerberos已经开启,则调用UserGroupInformation.loginUserFromKeytab进行登陆   
//登陆使用dfs.namenode.kerberos.principal作为用户名,否则使用当前linux的user作为用户。   
SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,   
DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());  
//获取namenode同时处理请求的数量配置,默认为10   服务器上处理器Handler线程的数量   
int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);  
// set service-level authorization security policy  
//如果授权配置(hadoop.security.authorization)开启了,则刷新授权策略   
if (serviceAuthEnabled = conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {  
//默认会重新加载hadoop-policy.xml中的acl配置,加载完成后acl配置保存在  
ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());  
}  
//创建服务指标,用于观察namenode服务状态   
myMetrics = NameNodeInstrumentation.create(conf);// 初始化NameNodeMetrics   
//启动FSNamesystem,启动FSNamesystem时,会启动各种thread执行namenode职责   
//1.init FSNamesystem  
this.namesystem = new FSNamesystem(this, conf);  
//如果安全机制开启   
if (UserGroupInformation.isSecurityEnabled()) {   
//启动守护线程每5秒执行一次ExpiredTokenRemover   
namesystem.activateSecretManager();   
}   
//2.init namenode-datanode,namenode-snn RPC Server  
//创建rpc服务器,如果dfs.namenode.servicerpc-address配置项存在,则用来作为服务器地址  
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);  
if (dnSocketAddr != null) {  
int serviceHandlerCount =  
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,  
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);  
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),   
dnSocketAddr.getPort(), serviceHandlerCount,  
false, conf, namesystem.getDelegationTokenSecretManager());  
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();  
setRpcServiceServerAddress(conf);  
}  
//3.init namenode-client RPC Server  
this.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager());  
// The rpc-server port can be ephemeral... ensure we have the correct info  
this.serverAddress = this.server.getListenerAddress();   
FileSystem.setDefaultUri(conf, getUri(serverAddress));  
LOG.info("Namenode up at: " + this.serverAddress);  

//4. start Http Server  
startHttpServer(conf);  
//5. start namenode-client RPC Server  
this.server.start();     
if (serviceRpcServer != null) {  
serviceRpcServer.start();     
//6. start namenode-datanode,namenode-snn RPC Server  
}  
//7.start TrashEmptier  
//启动垃圾清理守护线程,读取fs.trash.interval的值作为两次清理的时间间隔。默认每60分钟清理一次   
startTrashEmptier(conf);  
}  
 

(1)初始化FSNamesystem

启动FSNamesystem,启动FSNamesystem时,会启动各种thread执行namenode职责。FSNamesystem 的构造函数会调用 initialize 方法,去初始化上面我们分析过的一堆成员发量。几个重要的步骤包括加载 FSImage editlog 设置系统为安全模式,初始化各个工作线程和 HTTP 服务器。


FSNamesystem重要属性:

// Default initial capacity and load factor of map  
public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16;  
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;  
private boolean isPermissionEnabled;//是否打开权限检查,可以通过配置项dfs.permissions来设置。  
//本地文件的用户文件属主和文件组,可以通过hadoop.job.ugi设置,如果没有设置,那么将使用启动HDFS的用户(通过whoami获得)和该用户所在的组(通过groups获得)作为值  
private UserGroupInformation fsOwner;  
private String supergroup;//对应配置项dfs.permissions.supergroup,应用在defaultPermission中,是系统的超级组。  
//缺省权限,缺省用户为fsOwner,缺省用户组为supergroup,缺省权限为0777,可以通过dfs.upgrade.permission修改。  
private PermissionStatus defaultPermission;  
// FSNamesystemMetrics counter variables  
//系统总容量/已使用容量/剩余容量  
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;  
//系统总连接数,根据DataNode心跳信息跟新  
private int totalLoad = 0;  
boolean isAccessTokenEnabled;  
BlockTokenSecretManager accessTokenHandler;  
private long accessKeyUpdateInterval;  
private long accessTokenLifetime;  
// Scan interval is not configurable.  
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =  
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);  
private DelegationTokenSecretManager dtSecretManager;  
//分别是成员变量pendingReplications(正在复制的数据块),  
//neededReplications(需要复制的数据块)的大小,  
//scheduledReplicationBlocksCount是当前正在处理的复制工作数目  
volatile long pendingReplicationBlocksCount = 0L;  
volatile long corruptReplicaBlocksCount = 0L;  
volatile long underReplicatedBlocksCount = 0L;  
volatile long scheduledReplicationBlocksCount = 0L;  
volatile long excessBlocksCount = 0L;  
volatile long pendingDeletionBlocksCount = 0L;  
//  
// Stores the correct file name hierarchy  
//指向系统使用的FSDirectory对象。  
public FSDirectory dir;  
//  
// Mapping: Block -> { INode, datanodes, self ref }   
// Updated only in response to client-sent information.  
//  
final BlocksMap blocksMap = new BlocksMap(DEFAULT_INITIAL_MAP_CAPACITY,   
DEFAULT_MAP_LOAD_FACTOR);  
//  
// Store blocks-->datanodedescriptor(s) map of corrupt replicas  
//  
//保存损坏(如:校验没通过)的数据块到对应DataNode的关系,CorruptReplicasMap类图如下,类只有一个成员变量,  
//保存Block到一个DatanodeDescriptor的集合的映射和这个映射上的一系列操作  
//  
public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();  
/**
* Stores the datanode -> block map.   
* <p>
* Done by storing a set of {@link DatanodeDescriptor} objects, sorted by  
* storage id. In order to keep the storage map consistent it tracks  
* all storages ever registered with the namenode.
* A descriptor corresponding to a specific storage id can be
* <ul>  
* <li>added to the map if it is a new storage id;</li>
* <li>updated with a new datanode started as a replacement for the old one  
* with the same storage id; and </li>
* <li>removed if and only if an existing datanode is restarted to serve a
* different storage id.</li>
* </ul> <br>
* The list of the {@link DatanodeDescriptor}s in the map is checkpointed
* in the namespace image file. Only the {@link DatanodeInfo} part is  
* persistent, the list of blocks is restored from the datanode block
* reports.  
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*  
* 保存了StorageID >> DatanodeDescriptor的映射,用于保证DataNode使用的Storage的一致性。
*/  
NavigableMap<String, DatanodeDescriptor> datanodeMap =  new TreeMap<String, DatanodeDescriptor>();  
//  
// Keeps a Collection for every named machine containing  
// blocks that have recently been invalidated and are thought to live  
// on the machine in question.  
// Mapping: StorageID -> ArrayList<Block>  
//  
// 保存了每个DataNode上无效但还存在的数据块(StorageID >> ArrayList<Block>)。  
// 保存了每个DataNode上有效,但需要删除的数据块(StorageID >> TreeSet<Block>),这种情况可能发生在一个DataNode故障后恢复后,  
// 上面的数据块在系统中副本数太多,需要删除一些数据块。  
private Map<String, Collection<Block>> recentInvalidateSets =  new TreeMap<String, Collection<Block>>();  
//  
// Keeps a TreeSet for every named node.  Each treeset contains  
// a list of the blocks that are "extra" at that location.  We'll  
// eventually remove these extras.  
// Mapping: StorageID -> TreeSet<Block>  
//保存Datanode上有效但需要删除的数据块(StorageID -> TreeSet<Block>)比如一个Datanode故障恢复后,上面的数据块在系统中副本数太多,需要删除一些数据块。  
//  
Map<String, Collection<Block>> excessReplicateMap = new TreeMap<String, Collection<Block>>();  
Random r = new Random();  
/**
* Stores a set of DatanodeDescriptor objects.
* This is a subset of {@link #datanodeMap}, containing nodes that are  
* considered alive.
* The {@link HeartbeatMonitor} periodically checks for outdated entries,
* and removes them from the list.
* 所有目前活着的DataNode,线程HeartbeatMonitor会定期检查
*/  
ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();  
/**
* Store set of Blocks that need to be replicated 1 or more times.
* Set of: Block
*  
* 需要进行复制的数据块。UnderReplicatedBlocks的类图如下,它其实是一个数组,数组的下标是优先级(0的优先级最高,如果数据块只有一个副本,它的优先级是0),
* 数组的内容是一个Block集合。UnderReplicatedBlocks提供一些方法,对Block进行增加,修改,查找和删除。
*  
*/  
private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();  
// We also store pending replication-orders.保存正在复制的数据块的相关信息  
private PendingReplicationBlocks pendingReplications;  
public LeaseManager leaseManager = new LeaseManager(this);   
//  
// Threaded object that checks to see if we have been  
// getting heartbeats from all clients.   
//  
Daemon hbthread = null;   // HeartbeatMonitor thread 对应DataNode心跳检查  
public Daemon lmthread = null;   // LeaseMonitor thread 租约检查  
Daemon smmthread = null;  // SafeModeMonitor thread 安全模式检查  
public Daemon replthread = null;  // Replication thread 数据块复制  
private ReplicationMonitor replmon = null; // Replication metrics  
private volatile boolean fsRunning = true; //系统运行标志  
long systemStart = 0;//系统启动时间  
//  The maximum number of replicates we should allow for a single block  
private int maxReplication;  
//  How many outgoing replication streams a given node should have at one time  
private int maxReplicationStreams;  
// MIN_REPLICATION is how many copies we need in place or else we disallow the write  
private int minReplication;  
// Default replication  
private int defaultReplication;  
// Variable to stall new replication checks for testing purposes  
private volatile boolean stallReplicationWork = false;  
// heartbeatRecheckInterval is how often namenode checks for expired datanodes  
private long heartbeatRecheckInterval;  
// heartbeatExpireInterval is how long namenode waits for datanode to report  
// heartbeat  
private long heartbeatExpireInterval;  
//replicationRecheckInterval is how often namenode checks for new replication work  
private long replicationRecheckInterval;  
// default block size of a file  
private long defaultBlockSize = 0;  
// allow appending to hdfs files  
private boolean supportAppends = true;  
/**
* Last block index used for replication work.
*/  
private int replIndex = 0; ///和neededReplications配合,记录下一个进行复制的数据块位置。  
private long missingBlocksInCurIter = 0;  
private long missingBlocksInPrevIter = 0;   
public static FSNamesystem fsNamesystemObject;  
/** NameNode RPC address */  
private InetSocketAddress nameNodeAddress = null; // TODO: name-node has this field, it should be removed here  
//安全模式是这样一种状态,系统处于这个状态时,不接受任何对名字空间的修改,同时也不会对数据块进行复制或删除数据块。  
//NameNode启动的时候会自动进入安全模式,同时也可以手工进入(不会自动离开)。系统启动以后,DataNode会报告目前它拥有的数据块的信息,  
//当系统接收到的Block信息到达一定门槛,同时每个Block都有dfs.replication.min个副本后,系统等待一段时间后就离开安全模式。这个门槛定义的参数包括:  
//dfs.safemode.threshold.pct:接受到的Block的比例,缺省为95%,就是说,必须DataNode报告的数据块数目占总数的95%,才到达门槛;  
//dfs.replication.min:缺省为1,即每个副本都存在系统中;  
//dfs.replication.min:等待时间,缺省为0,单位秒。  
private SafeModeInfo safeMode;  // safe mode information  
//保存了主机名(String)到DatanodeDescriptor数组的映射(Host2NodesMap唯一的成员变量为HashMap<String,DatanodeDescriptor[]> map,它的方法都是对这个map进行操作)。   
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();  

// datanode networktoplogy  
//  定义了HDFS的网络拓扑,网络拓扑对应选择数据块副本的位置很重要。如在一个层次型的网络中,接到同一个交换机的两个节点间的网络速度,  
//  会比跨越多个交换机的两个节点间的速度快,但是,如果某交换机故障,那么它对接到它上面的两个节点会同时有影响,但跨越多个交换机的两个节点,这种影响会小得多  
NetworkTopology clusterMap = new NetworkTopology();  
private DNSToSwitchMapping dnsToSwitchMapping;  
// for block replicas placement  
//用于为数据块备份选择目标,例如,用户写文件时,需要选择一些DataNode,作为数据块的存放位置,这时候就利用它来选择目标地址。  
//chooseTarget是ReplicationTargetChooser中最重要的方法,  
//它通过内部的一个NetworkTopology对象,计算出一个DatanodeDescriptor数组,该数组就是选定的DataNode,同时,顺序就是最佳的数据流顺序  
ReplicationTargetChooser replicator;  
//保存了系统中允许/不允许连接到NameNode的机器列表  
private HostsFileReader hostsReader;   
//  线程句柄,该线程用于检测DataNode上的Decommission进程。例如,某节点被列入到不允许连接到NameNode的机器列表中(HostsFileReader)  
//  那么,该节点会进入Decommission状态,它上面的数据块会被复制到其它节点,复制结束后机器进入DatanodeInfo.AdminStates.DECOMMISSIONED,这台机器就可以从HDFS中撤掉。  
private Daemon dnthread = null;  
//系统能拥有的INode最大数(配置项dfs.max.objects,0为无限制)。  
private long maxFsObjects = 0;          // maximum number of fs objects  
/**
* The global generation stamp for this file system.  
*/  
private final GenerationStamp generationStamp = new GenerationStamp();  
// Ask Datanode only up to this many blocks to delete.  
//  发送给DataNode删除数据块消息中,能包含的最大数据块数。比方说,如果某DataNode上有250个Block需要被删除,而这个参数是100,  
//  那么一共会有3条删除数据块消息消息,前面两条包含了100个数据块,最后一条是50个。  
int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;  
// precision of access times.  
//用于控制文件的access时间的精度,也就是说,小于这个精度的两次对文件访问,后面的那次就不做记录了。  
private long accessTimePrecision = 0;  
private String nameNodeHostName;  
 

   FSNamesystem初始化方法:

 
    private void initialize(NameNode nn, Configuration conf) throws IOException {  
this.systemStart = now();  
setConfigurationParameters(conf);  
//读取配置文件   
dtSecretManager = createDelegationTokenSecretManager(conf);  
//读取dfs.namenode.delegation.token的相关配置   

this.nameNodeAddress = nn.getNameNodeAddress();  
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus  
this.dir = new FSDirectory(this, conf);  
StartupOption startOpt = NameNode.getStartupOption(conf);  
this.dir.loadFSImage(getNamespaceDirs(conf),  
getNamespaceEditsDirs(conf), startOpt);  
//加载namenode持久化在硬盘的信息   

long timeTakenToLoadFSImage = now() - systemStart;  
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");  
NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage);  
//加载安全模式信息,进入安全模式   
this.safeMode = new SafeModeInfo(conf);  
//将block的总数设置给safemode  
setBlockTotal();  
pendingReplications = new PendingReplicationBlocks(  
conf.getInt("dfs.replication.pending.timeout.sec",   
-1) * 1000L);  
if (isAccessTokenEnabled) {  
accessTokenHandler = new BlockTokenSecretManager(true,  
accessKeyUpdateInterval, accessTokenLifetime);  
}  
//启动心跳监控的线程   
this.hbthread = new Daemon(new HeartbeatMonitor());  
//启动文件租约管理监控的线程   
this.lmthread = new Daemon(leaseManager.new Monitor());  
//启动副本监控的线程   
this.replmon = new ReplicationMonitor();  
this.replthread = new Daemon(replmon);  
hbthread.start();  
lmthread.start();  
replthread.start();  
//读取主机信息黑白名单  
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),  
conf.get("dfs.hosts.exclude",""));  
//启动退役节点监控的线程   
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(  
conf.getInt("dfs.namenode.decommission.interval", 30),  
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));  
dnthread.start();  
this.dnsToSwitchMapping = ReflectionUtils.newInstance(  
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,  
DNSToSwitchMapping.class), conf);  
/* If the dns to swith mapping supports cache, resolve network  
* locations of those hosts in the include list,  
* and store the mapping in the cache; so future calls to resolve
* will be fast.
*/  
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {  
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));  
}  
InetSocketAddress socAddr = NameNode.getAddress(conf);  
this.nameNodeHostName = socAddr.getHostName();  
//将这个类注册到监控系统   
registerWith(DefaultMetricsSystem.INSTANCE);  
}  
 

       从代码中可以看到,FSNamesystem的initialize方法主要的工作是:通过读取配置文件设置成员变量;创建FSDirectory并 loadFSImage;设置系统安全模式;启动一系列的后台线程monitorDaemon。其中对 loadFSImage非常重要。 Namenode会将HDFS的文件和目录元数据存储在一个叫fsimage的二进制文 件中,每次保存fsimage之后到下次保存之间的所有hdfs操作,将会记录在editlog文件中,当editlog达到一定的大小(bytes,由 fs.checkpoint.size参数定义)或从上次保存过后一定时间段过后(sec,由fs.checkpoint.period参数定 义),namenode会重新将内存中对整个HDFS的目录树和文件元数据刷到fsimage文件中。Namenode就是通过这种方式来保证HDFS中 元数据信息的安全性。当namenode重启加载fsimage时,就是按照如下格式协议从文件流中加载元数据信息。
从fsimag的存储格式可以看出,fsimage保存有如下信息:

写道

1. image head,其中包含:
a) imgVersion(int):当前image的版本信息
b) namespaceID(int):用来确保别的HDFS instance中的datanode不会误连上当前NN。
c) numFiles(long):整个文件系统中包含有多少文件和目录
d) genStamp(long):生成该image时的时间戳信息。

2.文件或目录的源数据信息,如果是目录,则包含以下信息:
a)path(String):该目录的路径,如”/user/zhuhui/data”
b)replications(short):副本数(目录虽然没有副本,但这里记录的目录副本数也为3)
c)mtime(long):该目录的修改时间的时间戳信息
d)atime(long):该目录的访问时间的时间戳信息
e)blocksize(long):目录的blocksize都为0
f)numBlocks(int):实际有多少个文件块,目录的该值都为-1,表示该item为目录
g)nsQuota(long):namespace Quota值,若没加Quota限制则为-1
h)dsQuota(long):disk Quota值,若没加限制则也为-1
i)username(String):该目录的所属用户名
j)group(String):该目录的所属组
k)permission(short):该目录的permission信息,如644等,有一个short来记录。

3.如果是文件,则还会额外包含如下信息:
a)blockid(long):属于该文件的block的blockid,
b)numBytes(long):该block的大小
c)genStamp(long):该block的时间戳

 


       当该文件对应的numBlocks数不为1,而是大于1时,表示该文件对应有多个block信息,此时紧接在该fsimage之后的就会有多个 blockid,numBytes和genStamp信息。因此,在namenode启动时,就需要对fsimage按照如下格式进行顺序的加载,以将 fsimage中记录的HDFS元数据信息加载到内存中。
       namenode在加载fsimage过程其实非常简单,就是从fsimage中不停的顺序读取文件和目录的元数据信息,并在内存中构建整个 namespace,此时BlocksMap中每个block对应的datanodes 列表暂时为空。当fsimage加载完毕后,整个HDFS的目录结构在内存中就已经初始化完毕,所缺的就是每个文件对应的block对应的 datanode列表信息。这些信息需要从datanode的blockReport中获取,所以加载fsimage完毕后,namenode进程进入 rpc等待状态,等待所有的datanodes发送blockReports。
    HDFS将fsimage和edits文件内容读入内存,进行合并,填充与INode相关的元数据结构,并将新的元数据内存镜像导出,在磁盘上形成新的 fsimage和edits文件。HDFS同时还接收DataNode的心跳信息,填充与Block和Data Node相关的元数据结构。
    saveNameSpace将元数据写入到磁盘,具体操作步骤:首先将current目录重命名为lastcheckpoint.tmp;然后在创建新的 current目录,并保存文件;最后将lastcheckpoint.tmp重命名为privios.checkpoint。  在加载完成时,初始化了一下四个守护线程:



    //启动心跳监控的线程   
this.hbthread = new Daemon(new HeartbeatMonitor());  
//启动文件租约管理监控的线程   
this.lmthread = new Daemon(leaseManager.new Monitor());  
//启动副本监控的线程   
this.replmon = new ReplicationMonitor();  
this.replthread = new Daemon(replmon);  
hbthread.start();  
lmthread.start();  
replthread.start();  
//启动退役节点监控的线程   
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(  
conf.getInt("dfs.namenode.decommission.interval", 30),  
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));  
dnthread.start();  
 (2)初始化RPC服务器
//init namenode-client RPC Server  
this.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager());  
public synchronized void start() {  
responder.start();  
listener.start();  
handlers = new Handler[handlerCount];  
for (int i = 0; i < handlerCount; i++) {  
handlers = new Handler(i);  
handlers.start();  
}  
}  
   server启动了三个(假设handlercount为1)线程,这三个线程分别为listener,responder,handler ,这三个线程之间是有职责关系的:Hadoop的Server采用了Java的NIO,这样的话就不需要为每一个socket连接建立一个线程,读取 socket上的数据。在Server中,只需要一个线程,就可以accept新的连接请求和读取socket上的数据,这个线程,就是 Listener。请求处理线程一般有多个,它们都是Server.Handle类的实例。它们的run方法循环地取出一个Server.Call,调用 Server.call方法,搜集结果并串行化,然后将结果放入Responder队列中。对于处理完的请求,需要将结果写回去,同样,利用NIO,只需 要一个线程,相关的逻辑在Responder里。

(3)启动Trash Emptier 线程 
    //启动垃圾清理守护线程,读取fs.trash.interval的值作为两次清理的时间间隔。默认每60分钟清理一次   
startTrashEmptier(conf);  
private void startTrashEmptier(Configuration conf) throws IOException {  
this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");  
this.emptier.setDaemon(true);  
this.emptier.start();  
}  
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313694-1-1.html 上篇帖子: Hadoop实战-中高级部分 之 HDFS原理 HDFS负载均衡 机架感知 健壮性 文件删除恢复机制 下篇帖子: hadoop作业调优参数整理及原理http://blog.csdn.net/starxu85/article/details/6212205
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表