设为首页 收藏本站
查看: 1269|回复: 0

[经验分享] 第七章:小朱笔记hadoop之源码分析-hdfs分析 Datanode 心跳分析

[复制链接]

尚未签到

发表于 2016-12-13 09:57:28 | 显示全部楼层 |阅读模式
第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

5.2 Datanode 心跳分析

(1)offerService分析

写道

(a)检查心跳间隔是否超时,如是向namenode发送心跳报告,内容是dfs的容量、剩余的空间和DataXceiverServer的数量等,调用processCommand方法处理namenode返回的命令
(b)通知namenode已经接收的块
(c)检查块报告间隔是否超时,如是向namenode发送块报告,调用processCommand方法处理namenode返回的命令
(d)如果没到下个发送心跳的时候,休眠

 

    /**
* Main loop for the DataNode.  Runs until shutdown,
* forever calling remote NameNode functions.
*  
*   1.检查心跳间隔是否超时,如是向namenode发送心跳报告,内容是dfs的容量、剩余的空间和DataXceiverServer的数量等,调用processCommand方法处理namenode返回的命令
*   2.通知namenode已经接收的块
*   3.检查块报告间隔是否超时,如是向namenode发送块报告,调用processCommand方法处理namenode返回的命令
*   4.如果没到下个发送心跳的时候,休眠
*  
*  
*   DNA_UNKNOWN = 0:未知操作
*   DNA_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验
*   DNA_INVALIDATE = 2:不合法的块,将所有块删除
*   DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时
*   DNA_REGISTER = 4:重新注册
*   DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级
*   DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息
*  
*   会利用保存在receivedBlockList和delHints两个列表中的信息。
*   receivedBlockList表明在这个DataNode成功创建的新的数据块
*   delHints,是可以删除该数据块的节点
*/  
public void offerService() throws Exception {  
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +   
" Initial delay: " + initialBlockReportDelay + "msec");  
//  
// Now loop for a long time....  
//  
while (shouldRun) {  
try {  
long startTime = now();  
//  
// Every so often, send heartbeat or block-report  
//  
if (startTime - lastHeartbeat > heartBeatInterval) {  
//定期发送心跳  
//  
// All heartbeat messages include following info:  
// -- Datanode name  
// -- data transfer port  
// -- Total capacity  
// -- Bytes remaining  
//  
lastHeartbeat = startTime;  
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,  
data.getCapacity(),  
data.getDfsUsed(),  
data.getRemaining(),  
xmitsInProgress.get(),  
getXceiverCount());  
myMetrics.addHeartBeat(now() - startTime);  
//LOG.info("Just sent heartbeat, with name " + localName);  
//响应namenode返回的命令做处理  
if (!processCommand(cmds))  
continue;  
}  
// check if there are newly received blocks  
Block [] blockArray=null;  
String [] delHintArray=null;  
synchronized(receivedBlockList) {  
synchronized(delHints) {  
int numBlocks = receivedBlockList.size();  
if (numBlocks > 0) {  
if(numBlocks!=delHints.size()) {  
LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );  
}  
//  
// Send newly-received blockids to namenode  
//  
blockArray = receivedBlockList.toArray(new Block[numBlocks]);// receivedBlockList表明在这个DataNode成功创建的新的数据块,而delHints,是可以删除该数据块的节点  
delHintArray = delHints.toArray(new String[numBlocks]);//在datanode.notifyNamenodeReceivedBlock函数中发生变化  
}  
}  
}  
if (blockArray != null) {  
if(delHintArray == null || delHintArray.length != blockArray.length ) {  
LOG.warn("Panic: block array & delHintArray are not the same" );  
}  
namenode.blockReceived(dnRegistration, blockArray, delHintArray);;//Block状态变化报告通过NameNode.blockReceived来报告。  
synchronized (receivedBlockList) {  
synchronized (delHints) {  
for(int i=0; i<blockArray.length; i++) {  
receivedBlockList.remove(blockArray);  
delHints.remove(delHintArray);  
}  
}  
}  
}  
// Send latest blockinfo report if timer has expired.  
if (startTime - lastBlockReport > blockReportInterval) {// 向namenode报告系统中Block状态的变化  
if (data.isAsyncBlockReportReady()) {  
// Create block report  
long brCreateStartTime = now();  
Block[] bReport = data.retrieveAsyncBlockReport();  
// Send block report  
long brSendStartTime = now();  
//向Namenode报告其上的块状态报告   
DatanodeCommand cmd = namenode.blockReport(dnRegistration,  
BlockListAsLongs.convertToArrayLongs(bReport));  
// Log the block report processing stats from Datanode perspective  
long brSendCost = now() - brSendStartTime;  
long brCreateCost = brSendStartTime - brCreateStartTime;  
myMetrics.addBlockReport(brSendCost);  
LOG.info("BlockReport of " + bReport.length  
+ " blocks took " + brCreateCost + " msec to generate and "  
+ brSendCost + " msecs for RPC and NN processing");  
//  
// If we have sent the first block report, then wait a random  
// time before we start the periodic block reports.  
//  
if (resetBlockReportTime) {  
lastBlockReport = startTime -  
R.nextInt((int)(blockReportInterval));  
resetBlockReportTime = false;  
} else {  
/* say the last block report was at 8:20:14. The current report  
* should have started around 9:20:14 (default 1 hour interval).  
* If current time is :
*   1) normal like 9:20:18, next report should be at 10:20:14
*   2) unexpected like 11:35:43, next report should be at
*      12:20:14
*/  
lastBlockReport += (now() - lastBlockReport) /   
blockReportInterval * blockReportInterval;  
}  
processCommand(cmd);  
} else {  
data.requestAsyncBlockReport();  
if (lastBlockReport > 0) { // this isn't the first report  
long waitingFor =  
startTime - lastBlockReport - blockReportInterval;  
String msg = "Block report is due, and been waiting for it for " +  
(waitingFor/1000) + " seconds...";  
if (waitingFor > LATE_BLOCK_REPORT_WARN_THRESHOLD) {  
LOG.warn(msg);  
} else if (waitingFor > LATE_BLOCK_REPORT_INFO_THRESHOLD) {  
LOG.info(msg);  
} else if (LOG.isDebugEnabled()) {  
LOG.debug(msg);  
}  
}  
}  
}  
// start block scanner;//启动blockScanner线程 进行block扫描  
if (blockScanner != null && blockScannerThread == null &&  
upgradeManager.isUpgradeCompleted()) {  
LOG.info("Starting Periodic block scanner.");  
blockScannerThread = new Daemon(blockScanner);  
blockScannerThread.start();  
}  
//  
// There is no work to do;  sleep until hearbeat timer elapses,   
// or work arrives, and then iterate again.  
//  
long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);  
synchronized(receivedBlockList) {  
if (waitTime > 0 && receivedBlockList.size() == 0) {  
try {  
receivedBlockList.wait(waitTime);  
} catch (InterruptedException ie) {  
}  
delayBeforeBlockReceived();  
}  
} // synchronized  
} catch(RemoteException re) {  
String reClass = re.getClassName();  
if (UnregisteredDatanodeException.class.getName().equals(reClass) ||  
DisallowedDatanodeException.class.getName().equals(reClass) ||  
IncorrectVersionException.class.getName().equals(reClass)) {  
LOG.warn("DataNode is shutting down: " +   
StringUtils.stringifyException(re));  
shutdown();  
return;  
}  
LOG.warn(StringUtils.stringifyException(re));  
} catch (IOException e) {  
LOG.warn(StringUtils.stringifyException(e));  
}  
} // while (shouldRun)  
} // offerService  
(2)processCommand分析

写道

DNA_UNKNOWN = 0:未知操作
_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验
DNA_INVALIDATE = 2:不合法的块,将所有块删除
DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时
DNA_REGISTER = 4:重新注册
DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级
DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息

 

    /**
*   DNA_UNKNOWN = 0:未知操作
*   DNA_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验
*   DNA_INVALIDATE = 2:不合法的块,将所有块删除
*   DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时
*   DNA_REGISTER = 4:重新注册
*   DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级
*   DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息
*  
* @param cmd
* @return true if further processing may be required or false otherwise.  
* @throws IOException
*/  
private boolean processCommand(DatanodeCommand cmd) throws IOException {  
if (cmd == null)  
return true;  
final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;  
switch(cmd.getAction()) {  
//传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验  
case DatanodeProtocol.DNA_TRANSFER:  
// Send a copy of a block to another datanode  
transferBlocks(bcmd.getBlocks(), bcmd.getTargets());  
myMetrics.incrBlocksReplicated(bcmd.getBlocks().length);  
break;  
//不合法的块,将所有块删除  
case DatanodeProtocol.DNA_INVALIDATE:  
//  
// Some local block(s) are obsolete and can be   
// safely garbage-collected.  
//  
Block toDelete[] = bcmd.getBlocks();  
try {  
if (blockScanner != null) {  
blockScanner.deleteBlocks(toDelete);  
}  
data.invalidate(toDelete);  
} catch(IOException e) {  
checkDiskError();  
throw e;  
}  
myMetrics.incrBlocksRemoved(toDelete.length);  
break;  
//停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时  
case DatanodeProtocol.DNA_SHUTDOWN:  
// shut down the data node  
this.shutdown();  
return false;  
//重新注册  
case DatanodeProtocol.DNA_REGISTER:  
// namenode requested a registration - at start or if NN lost contact  
LOG.info("DatanodeCommand action: DNA_REGISTER");  
if (shouldRun) {  
register();  
}  
break;  
//完成升级,调用DataStorage的finalizeUpgrade方法完成升级  
case DatanodeProtocol.DNA_FINALIZE:  
storage.finalizeUpgrade();  
break;  
case UpgradeCommand.UC_ACTION_START_UPGRADE:  
// start distributed upgrade here  
processDistributedUpgradeCommand((UpgradeCommand)cmd);  
break;  
//请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息  
case DatanodeProtocol.DNA_RECOVERBLOCK:  
recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());  
break;  
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:  
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");  
if (isBlockTokenEnabled) {  
blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());  
}  
break;  
//块均衡  
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:  
LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");  
int vsn = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthVersion();  
if (vsn >= 1) {  
long bandwidth =   
((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();  
if (bandwidth > 0) {  
DataXceiverServer dxcs =  
(DataXceiverServer) this.dataXceiverServer.getRunnable();  
dxcs.balanceThrottler.setBandwidth(bandwidth);  
}  
}  
break;  
default:  
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());  
}  
return true;  
}  
 
 
 
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313630-1-1.html 上篇帖子: [Hadoop]Hive r0.9.0中文文档(二)之联表查询Join 下篇帖子: 第七章:小朱笔记hadoop之源码分析-hdfs分析 DataBlockScanner 文件校验
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表