设为首页 收藏本站
查看: 796|回复: 0

[经验分享] 第七章:小朱笔记hadoop之源码分析-hdfs分析 Datanode 启动过程分析

[复制链接]

尚未签到

发表于 2016-12-13 09:19:11 | 显示全部楼层 |阅读模式
第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

 

5.1 Datanode 启动过程分析

(1)shell脚本启动DataNode

start-dfs.sh  
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt
 
(2)main()函数启动分析
        主线程阻塞,让DataNode的任务循环执行,调用createDataNode方法创建datanode,等datanode线程结束 。

    //主线程阻塞,让DataNode的任务循环执行  
public static void secureMain(String [] args, SecureResources resources) {  
try {  
LOG.info("start up datanode...");  
if(null!=args){   
for(String arg:args) {  
LOG.info("arg:"+arg);  
}  
}  
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);  
DataNode datanode = createDataNode(args, null, resources);  
if (datanode != null)  
datanode.join();  
} catch (Throwable e) {  
LOG.error(StringUtils.stringifyException(e));  
System.exit(-1);  
} finally {  
// We need to add System.exit here because either shutdown was called or  
// some disk related conditions like volumes tolerated or volumes required  
// condition was not met. Also, In secure mode, control will go to Jsvc and  
// the process hangs without System.exit.  
LOG.info("Exiting Datanode");  
System.exit(0);  
}  
}  
 
(3)创建Datanode实例,并启动Datanode线程
  调用instantiateDataNode方法初始化datanode,调用runDatanodeDaemon方法运行datanode线程.

    /** Instantiate & Start a single datanode daemon and wait for it to finish.
*  If this thread is specifically interrupted, it will stop waiting.
*  LimitedPrivate for creating secure datanodes
*/  
public static DataNode createDataNode(String args[],  
Configuration conf, SecureResources resources) throws IOException {  
//初始化DataNode  
DataNode dn = instantiateDataNode(args, conf, resources);  
runDatanodeDaemon(dn);  
//进行DataNode注册,创建线程,设置守护线程,启动线程  
return dn;  
}  
 
(4)实例化DataNode结点
  解析启动参数:
  如果设置了机架配置${dfs.network.script},退出程序
  通过配置${dfs.data.dir}得到datanode的存储目录
  调用makeInstance方法创建实例
  makeInstance检查数据存储目录的合法性 并初始化DataNode对象
  DataNode构造函数中调用startDataNode根据具体配置文件的信息进行具体的初始化过程
  调用startDataNode方法启动datanode
  如果启动出错,调用shutdown方法关闭datanode
 

    /**
* Start a Datanode with specified server sockets for secure environments
* where they are run with privileged ports and injected from a higher
* level of capability
*/  
DataNode(final Configuration conf,  
final AbstractList<File> dataDirs, SecureResources resources) throws IOException {  
super(conf);  
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,   
DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);  
datanodeObject = this;  
supportAppends = conf.getBoolean("dfs.support.append", false);  
this.userWithLocalPathAccess = conf  
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);  
try {  
startDataNode(conf, dataDirs, resources);  
} catch (IOException ie) {  
shutdown();  
throw ie;  
}     
}  
 
(5)创建Datanode实例

写道

(a)获得本地主机名和namenode的地址
(b)连接namenode,本地datanode的名称为:“machineName:port”
(c)从namenode得到version和id信息
(d)初始化存储目录结构,如果有目录没有格式化,对其进行格式化
(e)打开datanode监听端口ss,默认端口是50010
(f)初始化DataXceiverServer后台线程,使用ss接收请求
(g)初始化DataBlockScanner,块的校验只支持FSDataset
(h)初始化并启动datanode信息服务器infoServer,默认访问地址是http://0.0.0.0:5007,如果允许https,默认https端口是50475 infoServer添加DataBlockScanner的Servlet,访问地址是http://0.0.0.0:50075/blockScannerReport .
(i)初始化并启动ipc服务器,用于RPC调用,默认端口是50020

 

    /**
* This method starts the data node with the specified conf.
*  
* @param conf - the configuration
*  if conf's CONFIG_PROPERTY_SIMULATED property is set
*  then a simulated storage based data node is created.
*  
* @param dataDirs - only for a non-simulated storage data node
* @throws IOException
* @throws MalformedObjectNameException  
* @throws MBeanRegistrationException  
* @throws InstanceAlreadyExistsException  
*/  
void startDataNode(Configuration conf,   
AbstractList<File> dataDirs, SecureResources resources  
) throws IOException {  
if(UserGroupInformation.isSecurityEnabled() && resources == null)  
throw new RuntimeException("Cannot start secure cluster without " +  
"privileged resources.");  
this.secureResources = resources;  
// use configured nameserver & interface to get local hostname  
//设置machineName  
if (conf.get("slave.host.name") != null) {  
machineName = conf.get("slave.host.name");     
}  
if (machineName == null) {  
machineName = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface","default"),conf.get("dfs.datanode.dns.nameserver","default"));  
}  
//获取nameNode的地址信息   
InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);  
//setSocketout时间   
this.socketTimeout =  conf.getInt("dfs.socket.timeout",HdfsConstants.READ_TIMEOUT);  
this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",HdfsConstants.WRITE_TIMEOUT);  
/* Based on results on different platforms, we might need set the default  
* to false on some of them. */  
this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", true);  
//写包的大小,默认64K   
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);  
//创建本地socketaddress地址   
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);  
int tmpPort = socAddr.getPort();  
//DataStorage保存了存储相关的信息  
storage = new DataStorage();// Data storage information file. 数据存储信息文件   
//构造一个注册器  
// construct registration  
this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);  
//通过动态代理生成namenode实例  
// connect to name node  
this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr, conf);  
// get version and id info from the name-node   
// 从名称节点获取版本和id信息  主要包含buildVersin和distributeUpgradeVersion,用于版本检验  
NamespaceInfo nsInfo = handshake();  

StartupOption startOpt = getStartupOption(conf);  
assert startOpt != null : "Startup option must be set.";  
boolean simulatedFSDataset = conf.getBoolean("dfs.datanode.simulateddatastorage", false);  
//判断一下是否是伪分布式,否则走正常判断,此处分析正常逻辑  
if (simulatedFSDataset) {  
setNewStorageID(dnRegistration);  
dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;  
dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;  
// it would have been better to pass storage as a parameter to  
// constructor below - need to augment ReflectionUtils used below.  
conf.set("StorageId", dnRegistration.getStorageID());  
try {  
//Equivalent of following (can't do because Simulated is in test dir)  
//  this.data = new SimulatedFSDataset(conf);  
this.data = (FSDatasetInterface) ReflectionUtils.newInstance(  
Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);  
} catch (ClassNotFoundException e) {  
throw new IOException(StringUtils.stringifyException(e));  
}  
} else { // real storage  
// read storage info, lock data dirs and transition fs state if necessary  
storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);  
// adjust  
//将storage进行信息注册  
this.dnRegistration.setStorageInfo(storage);  
// initialize data node internal structure  
//根据storage和conf信息,生成FSDataset,用于数据块操作   
this.data = new FSDataset(storage, conf);  
}  
// register datanode MXBean  
this.registerMXBean(conf); // register the MXBean for DataNode  
// Allow configuration to delay block reports to find bugs  
artificialBlockReceivedDelay = conf.getInt(  
"dfs.datanode.artificialBlockReceivedDelay", 0);  
// find free port or use privileged port provide  
//初始化Socket服务器端,区分NIO和IO   
ServerSocket ss;  
if(secureResources == null) {  
ss = (socketWriteTimeout > 0) ?   
ServerSocketChannel.open().socket() : new ServerSocket();  
Server.bind(ss, socAddr, 0);  
} else {  
ss = resources.getStreamingSocket();  
}  
//设置接收的buffer缓存大小,默认64K  
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);   
// adjust machine name with the actual port  
tmpPort = ss.getLocalPort();  
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),  
tmpPort);  
this.dnRegistration.setName(machineName + ":" + tmpPort);  
LOG.info("Opened info server at " + tmpPort);  
//服务器 用于接收/发送 一个数据块 。 这是创建监听来自客户或其他 DataNodes 的 请求 。 这种小型服务器不使用 thHadoop IPC机制  
//初始化处理类dataXceiverServer   
this.threadGroup = new ThreadGroup("dataXceiverServer");  
this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));  
this.threadGroup.setDaemon(true); // auto destroy when empty  
//分别设置块状态信息间隔时间和心跳间隔时间  
this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);  
this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",BLOCKREPORT_INITIAL_DELAY)* 1000L;   
if (this.initialBlockReportDelay >= blockReportInterval) {  
this.initialBlockReportDelay = 0;  
LOG.info("dfs.blockreport.initialDelay is greater than " +  
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");  
}  
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;  

DataNode.nameNodeAddr = nameNodeAddr;  
//initialize periodic block scanner  
String reason = null;  
if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {  
reason = "verification is turned off by configuration";  
} else if ( !(data instanceof FSDataset) ) {  
reason = "verifcation is supported only with FSDataset";  
}   
if ( reason == null ) {  
//初始化一个定期检查scanner blockScanner用于定时对文件块进行扫描   
blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);  
} else {  
LOG.info("Periodic Block Verification is disabled because " +  
reason + ".");  
}  
//create a servlet to serve full-file content  
//创建HttpServer,内部用jetty实现,用于页面监控   
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);  
String infoHost = infoSocAddr.getHostName();  
int tmpInfoPort = infoSocAddr.getPort();  
this.infoServer = (secureResources == null)   
? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,   
conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN))  
: new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,  
conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN),  
secureResources.getListener());  
if (conf.getBoolean("dfs.https.enable", false)) {  
boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);  
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(  
"dfs.datanode.https.address", infoHost + ":" + 0));  
Configuration sslConf = new Configuration(false);  
sslConf.addResource(conf.get("dfs.https.server.keystore.resource",  
"ssl-server.xml"));  
this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);  
}  
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);  
this.infoServer.addInternalServlet(null, "/getFileChecksum/*",  
FileChecksumServlets.GetServlet.class);  
this.infoServer.setAttribute("datanode", this);  
this.infoServer.setAttribute("datanode.blockScanner", blockScanner);  
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);  
this.infoServer.addServlet(null, "/blockScannerReport",   
DataBlockScanner.Servlet.class);  
if (WebHdfsFileSystem.isEnabled(conf, LOG)) {  
infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class  
.getPackage().getName() + ";" + Param.class.getPackage().getName(),  
WebHdfsFileSystem.PATH_PREFIX + "/*");  
}  
this.infoServer.start();  

// adjust info port  
this.dnRegistration.setInfoPort(this.infoServer.getPort());  
myMetrics = DataNodeInstrumentation.create(conf,  
dnRegistration.getStorageID());  
// set service-level authorization security policy  
if (conf.getBoolean(  
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {  
ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());  
}  
// BlockTokenSecretManager is created here, but it shouldn't be  
// used until it is initialized in register().  
this.blockTokenSecretManager = new BlockTokenSecretManager(false,0, 0);  
//init ipc server  
//开启本地ipc服务,监听来自client和其它   
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(conf.get("dfs.datanode.ipc.address"));  
ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),   
conf.getInt("dfs.datanode.handler.count", 3), false, conf,  
blockTokenSecretManager);  
dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());  
LOG.info("dnRegistration = " + dnRegistration);  
}  
 
 
(6)运行DataNode结点
进行DataNode注册,创建线程,设置守护线程,启动线程。

    /** Start a single datanode daemon and wait for it to finish.
*  If this thread is specifically interrupted, it will stop waiting.
*/  
public static void runDatanodeDaemon(DataNode dn) throws IOException {  
if (dn != null) {  
//register datanode  
dn.register();  
dn.dataNodeThread = new Thread(dn, dnThreadName);  
dn.dataNodeThread.setDaemon(true); // needed for JUnit testing  
dn.dataNodeThread.start();  
}  
}  
 
 启动DataXceiverServer,然后进入datanode的正常运行。检查是否需要升级,调用offerService方法提供服务.

    public void run() {  
LOG.info(dnRegistration + "In DataNode.run, data = " + data);  
///启动数据块的流读写服务器,  
// start dataXceiveServer  
dataXceiverServer.start();  
//内部hadoop ipc服务器  
ipcServer.start();  
while (shouldRun) {  
try {  
//检测是否需要升级hadoop文件系统  
startDistributedUpgradeIfNeeded();  
//DataNode提供服务,定时发送心跳给NameNode,响应NameNode返回的命令并执行  
offerService();  
} catch (Exception ex) {  
LOG.error("Exception: " + StringUtils.stringifyException(ex));  
if (shouldRun) {  
try {  
Thread.sleep(5000);  
} catch (InterruptedException ie) {  
}  
}  
}  
}  
LOG.info(dnRegistration + ":Finishing DataNode in: "+data);  
shutdown();  
}  
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313572-1-1.html 上篇帖子: 异步机制(Asynchronous) -- (二)异步消息机制兼谈Hadoop RPC 下篇帖子: 转发-百度搜索研发部门官方博客-日志分析方法概述-hadoop
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表