设为首页 收藏本站
查看: 1080|回复: 0

[经验分享] 第七章:小朱笔记hadoop之源码分析-hdfs分析 DataBlockScanner 文件校验

[复制链接]

尚未签到

发表于 2016-12-13 09:58:19 | 显示全部楼层 |阅读模式
第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

5.4  DataBlockScanner 文件校验

由于每一个磁盘或者是网络上的I/O操作可能会对正在读写的数据处理不慎而出现错误,所以HDFS提供了下面两种数据检验方式,以此来保证数据的完整性,而且这两种检验方式在DataNode节点上是同时工作的:
(1)校验和
检测损坏数据的常用方法是在第一次进行系统时计算数据的校验和,在通道传输过程中,如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是被损坏的。
(2)数据块检测程序(DataBlockScanner)
在DataNode节点上开启一个后台线程,来定期验证存储在它上所有块,这个是防止物理介质出现损减情况而造成的数据损坏。
 
  关于校验和,HDFS以透明的方式检验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。正对数据的每一个校验块,都会创建一个单独的校验 和,默认校验块大小是512字节,对应的校验和是4字节。DataNode节点负载在存储数据(当然包括数据的校验和)之前验证它们收到的数据, 如果此 DataNode节点检测到错误,客户端会收到一个CheckSumException。客户端读取DataNode节点上的数据时,会验证校验和,即将 其与DataNode上存储的校验和进行比较。每一个DataNode节点都会维护着一个连续的校验和和验证日志,里面有着每一个Block的最后验证时 间。客户端成功验证Block之后,便会告诉DataNode节点,Datanode节点随之更新日志。
  DataBlockScanner是在一个单独的线程里面进行执行,作用是周期性的对block进行校验,当DFSClient读取时,也会通知DataBlockScanner校验结果。
  DataBlockScanner最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量,最小扫描速度是1 MB/s,默认扫描周期是21天,扫描周期可通过dfs.datanode.scan.period.hours来设置。
  该类中的属性TreeSet<BlockScanInfo> blockInfoSet用来保存block的扫描时间,离现在时间间隔最长的排在首位,扫描的过程如下:
  检查blockInfoSet中的第一个block的最后扫描时间距离现在是否超过扫描周期,如果不超过,休眠一定时间然后开始下次检查,如果扫描周期, 那么对该block进行校验,校验使用BlockSender来读取,读取的数据输出到 NullOutputStream,我们知道BlockSender在读取数据时,可以检查checksum,以此来判断是否校验成功。如果校验失败,进 行第二次校验,如果两次都失败,说明该block有错误,通知namenode。
  与扫描日志相关的参数有:

    最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量  
最小扫描速度是1 MB/s  
默认扫描周期是3周,扫描周期可通过配置${dfs.datanode.scan.period.hours}来设置  

写道

日志文件名前缀是dncp_block_verification.log
共有两个日志:当前日志,文件后缀是.curr;前一个日志,文件后缀是.prev
minRollingPeriod:日志最小滚动周期是6小时
minWarnPeriod:日志最小警告周期是6小时,在一个警告周期内只有发出一个警告
minLineLimit:日志最小行数限制是1000

采用滚动日志方式,只有当前行数curNumLines超过最大行数maxNumLines,并且距离上次滚动日志的时间超过minRollingPeriod时,才将dncp_block_verification.log.curr重命名为dncp_block_verification.log.prev,将新的日志写到dncp_block_verification.log.curr中。

    class DataBlockScanner implements Runnable {  
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);  
private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec  
private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec  
static final long DEFAULT_SCAN_PERIOD_HOURS = 21 * 24L; // three weeks  
private static final long ONE_DAY = 24 * 3600 * 1000L;  
static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");  
static final String verificationLogFile = "dncp_block_verification.log";  
static final int verficationLogLimit = 5; // * numBlocks.  
//一个扫描周期,可以由Datanode的配置文件来设置,配置项是:dfs.datanode.scan.period.hours,单位是小时,默认的值是21*24*60*60*1000 ms  
private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;  
DataNode datanode;  
//数据块管理器  
FSDataset dataset;  
// sorted set  
//数据块扫描信息集合,按照上一次扫描时间和数据块id升序排序,以便快速获取验证到期的数据块;  
TreeSet<BlockScanInfo> blockInfoSet;  
//数据块和数据块扫描信息的映射,以便能够根据数据块快速获取对应的扫描信息;  
HashMap<Block, BlockScanInfo> blockMap;  
long totalScans = 0;  
long totalVerifications = 0; // includes remote verification by clients.  
long totalScanErrors = 0;  
long totalTransientErrors = 0;  
long currentPeriodStart = System.currentTimeMillis();  
//一个扫描周期中还剩下需要扫描的数据量;  
long bytesLeft = 0; // Bytes to scan in this period  
//一个扫描周期中需要扫描的总数据量;  
long totalBytesToScan = 0;  
//数据块的扫描验证日志记录器;  
private LogFileHandler verificationLog;  
Random random = new Random();  
//扫描时I/O速度控制器,需要根据totalBytesToScan和bytesLeft信息来衡量;  
BlockTransferThrottler throttler = null;  
private static enum ScanType {  
REMOTE_READ, // Verified when a block read by a client etc  
VERIFICATION_SCAN, // scanned as part of periodic verfication  
NONE,  
}  

......  
}  

     
         DataBlockScanner被DataNode节点用来检测它所管理的所有Block数据块的一致性,因此,对已DataNode节点上的每一个 Block,它都会每隔scanPeriod ms利用Block对应的校验和文件来检测该Block一次,看看这个Block的数据是否已经损坏。由于scanPeriod 的值一般比较大,因为对DataNode节点上的每一个Block扫描一遍要消耗不少系统资源,这就可能带来另外一个问题就是在一个扫描周 期内可能会出现DataNode节点重启的情况,所以为了提供系统性能,避免DataNode节点在启动之后对还没有过期的Block又扫描一 遍,DataBlockScanner在其内部使用了日志记录器来持久化保存每一个Block上一次扫描的时间,这样的话, DataNode节点在启动之后通过日志文件来恢复之前所有Block的有效时间。另外,DataNode为了节约系统资源,它对Block的验证不仅仅 只依赖于DataBlockScanner后台线程(VERIFICATION_SCAN方式),还会在向某一个客户端传送Block的时候来更行该 Block的扫描时间(REMOTE_READ方式),这是因为DataNode向客户端传送一个Block的时候要必须校验该数据块。那么这个时候日志 记录器并不会马上把该数据块的扫描信息写到日志,毕竟频繁的磁盘I/O会导致性能下降,至于何时对该Block的最新扫描时间写日志有一个判断条件: 1.如果是VERIFICATION_SCAN方式的Block验证,必须记日志; 2.如果是REMOTE_READ方式,那么该Block上一次的记录日志到现在的时间间隔超过24小时或者超过scanPeriod/3 ms 的话,记日志。 下面来结合源码详细讨论这个过程:
 
 

    public void run() {  
try {  
//初始化   
//1.为每个Block创建BlockScanInfo  
//2.创建 日志扫描器 LogFileHandler   
//3.创建 扫描速度控制器 BlockTransferThrottler  
init();  
// Read last verification times  
//为每一个Block分配上一次验证的时间   
if (!assignInitialVerificationTimes()) {  
return;  
}  
//调整扫描速度  
adjustThrottler();  
while (datanode.shouldRun && !Thread.interrupted()) {  
long now = System.currentTimeMillis();  
synchronized (this) {  
if (now >= (currentPeriodStart + scanPeriod)) {  
//新周期  
startNewPeriod();  
}  
}  
if ((now - getEarliestScanTime()) >= scanPeriod) {  
//校验  
verifyFirstBlock();  
} else {  
try {  
Thread.sleep(1000);  
} catch (InterruptedException ignored) {  
}  
}  
}  
} catch (RuntimeException e) {  
LOG.warn("RuntimeException during DataBlockScanner.run() : " + StringUtils.stringifyException(e));  
throw e;  
} finally {  
shutdown();  
LOG.info("Exiting DataBlockScanner thread.");  
}  
}  
(1)初始化init
 初始化两个关键数据结构
 

blockInfoSet = new TreeSet<BlockScanInfo>();  
blockMap = new HashMap<Block, BlockScanInfo>();  
为每个Block创建BlockScanInfo,创建日志扫描器 LogFileHandler ,创建扫描速度控制器 BlockTransferThrottler  
(2)为每一个Block分配上一次验证的时间
 

    BlockScanInfo info;  
while ((info = blockInfoSet.first()).lastScanTime < 0) {  
delBlockInfo(info);  
info.lastScanTime = lastScanTime;  
lastScanTime += verifyInterval;  
addBlockInfo(info);  
}   
(3)调整扫描速度
 在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的 磁盘I/O,为了不影响DataNode节点上其它线程的工作资源,同时也为了自身工作的有效性,所以DataBlockScanner采用了扫描验证速 度控制器,根据当前的工作量来控制当前数据块的验证速度。
 

    //本次扫描验证还剩余的时间   
long timeLeft = currentPeriodStart + scanPeriod - System.currentTimeMillis();  
//根据本次验证扫描剩余的工作量和时间来计算速度   
long bw = Math.max(bytesLeft * 1000 / timeLeft, MIN_SCAN_RATE);  
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));  
(4)DataNode节点在向客户端或者其它DataNode节点传输数据时,客户端 或者其它DataNode节点会根据接收的数据校验和来验证接收到的数据,当验证出错时,它们会通知传送节点。DataBlockScanner通过自己 扮演传输者又扮演接受者来实现数据块的验证的;同时为了防止本地磁盘的I/O的错误,DataBlockScanner采用了两次传输-接收来确保验证的 Block的数据是出错了(损坏了)。当发现有出错的Block是,就需要向NameNode节点报告,由NameNode来决定如何处理这个数据块,而 不是由DataNode节点擅自作主清除该Block数据信息。
 

    blockSender = new BlockSender(block, 0, -1, false, false, true, datanode);  
DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream());  
blockSender.sendBlock(out, null, throttler);  
 
(5)错误处理

    DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };  
LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };  
//向NameNode节点发送出错的Block   
datanode.namenode.reportBadBlocks(blocks);  
 
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313631-1-1.html 上篇帖子: 第七章:小朱笔记hadoop之源码分析-hdfs分析 Datanode 心跳分析 下篇帖子: 第七章:小朱笔记hadoop之源码分析-hdfs分析 DataNode 数据读写分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表