设为首页 收藏本站
查看: 1248|回复: 0

[经验分享] Hadoop学习笔记(3) Hadoop文件系统二

[复制链接]

尚未签到

发表于 2017-12-16 22:35:40 | 显示全部楼层 |阅读模式
  1 查询文件系统
  (1) 文件元数据:FileStatus,该类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及版权信息。FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。
  例:展示文件状态信息
  

public>
private MiniDFSCluster cluster;  

private FileSystem fs;  

  
@Before
  

public void setup() throws IOException{  
Configuration conf
= new Configuration();  

if(System.getProperty("test.build.data") == null){  
System.setProperties(
"test.build.data","/tmp");  
}
  
cluster
= new MiniDFSCluster(conf,1,true,null);  
fs
= cluster.getFileSystem();  
OutputStream out
= fs.create(new Path("/dir/file"));  
out.write(
"content".getBytes("UTF-8"));  
out.close();
  
}
  

  
@After
  

public void tearDown() throws IOException{  

if(fs != null){  
fs.close();
  
}
  

if(cluster != null){  
cluster.shutdown();
  
}
  
}
  

  
@Test(expected
= FileNotFoundException.class)  

public void throwsNotFoundForNonExistentFile() throws IOException{  
fs.getFileStatus(
new Path("no-such-file"));  
}
  

  
@Test
  

public void fileStatusForFile() throws IOException{  
Path file
= new Path("/dir/file");  
FileStatus stat
= fs.getFileStatus(file);  
assertThat(stat.getPath().tuUri().getPath(),is(
"/dir/file"));  
assertThat(stat.isDir(),is(
false));  
assertThat(stat.getLen(),is(
7L));  
assertThat(stat.getModificationTime(),
  
is(lessThanOrEqualTo(System.currentTimeMillis())));
  
assertThat(stat.getReplication(),is((
short)1));  
assertThat(stat.getBlockSize(),is(
64*1024*1024L));  
assertThar(stat.getOwner(),is(
"tom"));  
assertThat(stat.getGroup(),is(
"supergroup"));  
assertThat(stat.getPermission().toString(),is(
"rw-r--r--"));  
}
  

  
@Test
  

public void fileStatusForDirectory() throws IOException{  
Path dir
= new Path("/dir");  
FileStatus stat
= fs.getFileStatus(dir);  
assertThat(stat.getPath().tuUri().getPath(),is(
"/dir"));  
assertThat(stat.isDir(),is(
true));  
assertThat(stat.getLen(),is(
0L));  
assertThat(stat.getModificationTime(),
  
is(lessThanOrEqualTo(System.currentTimeMillis())));
  
assertThat(stat.getReplication(),is((
short)0));  
assertThat(stat.getBlockSize(),is(
0L));  
assertThar(stat.getOwner(),is(
"tom"));  
assertThat(stat.getGroup(),is(
"supergroup"));  
assertThat(stat.getPermission().toString(),is(
"rwxr-xr-x"));  

  
}
  
}
  

  (2) 列出文件
  FileSystem的listStatus()方法可以列出目录的内容:
  

public FileStatus[] listStatus(Path f) throws IOException  // 允许使用PathFilter来限制匹配的文件和目录
  

public FileStatus[] listStatus(Path f,PathFilter filter) throws IOException  // 指定一组目录,结果相当于依次轮流传递每条路径并对其调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中
  

public FileStatus[] listStatus(Path[] files) throws IOException  

public FileStatus[] listStatus(Path[] files,PathFilter filter) throws IOException  

  例:显示Hadoop文件系统一组路径的文件信息
  

public>
public static void main(String[] args) throws Exception{  
String uri
= args[0];  
Configuration conf
= new Configuration();  
FileSystem fs
= FileSystem.get(URI(uri),conf);  

  
Path[] paths
= new Path[args.length];  

for(int i = 0; i < paths.length; i++){  
paths  
= new Path(args);  
}
  

  
FileStatus[] status
= fs.listStatus(paths);  
Path[] listedPaths
= FileUtil.stat2Paths(status);  

for(Path p : listedPaths){  
System.out.println(p);
  
}
  
}
  
}
  

  运行示例:
  % hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
  (3)  文件模式
  Hadoop为执行通配提供了两个FileSystem方法:
  

public FileStatus[] globStatus(Path pathPattern) throws IOException  

public FileStatus[] globStatus(Path pathPattern,PathFilter filter) throws IOException  

  globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序。
  例:用于排除匹配正则表达式路径的PathFilter
  

public>
private final String regex;  

public RegexExcludePathFilter(String regex){  

this.regex = regex;  
}
  

  

public boolean accept(Path path){  

return !path.toString().matches(regex);  
}
  
}
  

  如下示例可扩展到/2007/12/30:
  fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$"))
  (4) 删除数据
  使用FileSystem的delete()方法可以永久性删除文件和目录:
  

public void delete(Path f,boolean recursive) throws IOException  

  当f是一个文件或空目录,recursive的值会被忽略。只有在recursive的值为true时,一个非空目录及其内容才能被删除。
  2. 数据流
  (1) 文件读取剖析
DSC0000.jpg

  客户端通过调用FileSystem对象的open()方法打开希望读取的文件(step 1)。DistributedFileSystem通过使用RPC来调用namenode,以确定文件起始块的位置(step 2)。对于每一个块,namenode返回存有该块复本的datanode地址,此外,这些datanode根据它们与客户端的距离来排序。
  DistributedFileSystem类返回一个FSDataFileInputStream对象给客户端并读取数据,FSDataFileInputStream类转而封装DFSInputStream对象。接着客户端对这个输入流调用read()方法(step 3)。存储着文件起始块的datanode地址的DFSInputStream随即连接距离最近的datanode。通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端(step 4)。到达块的末端时,DFSIputStream会关闭与该datanode的连接,然后寻找下一个块的最佳datanode(step 5)。
  客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的。它也要询问namenode来检索下一批所需块的datanode的位置。一旦客户端完成读取,就会对FSDataInputStream调用close()方法(step 6)。
  在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错误,,它便会尝试从这个块的另一个最近邻的datanode读取数据。它也会记住那个故障datanode,以保证以后不会反复读取该节点上后续的块。DFSInpuStream也会通过校验和确认从datanode发来的数据是否完整。如果发现一个损坏的块,它就会在DFSInputStream试图从其他datanode读取一个块的复本之前通知namenode。
  该设计的重点是:namenode告知客户端每个块中最佳的datanode,并让客户端直接联系该datanode且检索数据。
  (2) 文件写入剖析
DSC0001.jpg

  客户端通过对DistributedFileSystem对象调用create()函数来创建文件(step 1)。DistributedFileSystem对namenode创建一个RPC调用,在文件系统的命名空间中创建一个新文件,此时该文件中还没有相应的数据块(step 2)。namenode执行各种不同的检查以确保这个文件不存在,并且客户端有创建该文件的权限。检查均通过,namenode就会创建新文件记录一条记录。DistributedFileSystem向客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。FSDataOutputStream封装一个DFSOutputStream。
  在客户端写入数据时(step 3),DFSOutputStream将它分成一个一个的数据包,并写入内部队列,称为"数据队列"(data queue)。DataStreamer处理数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块来存储数据备份。这一组datanode构成一个管线(假设复本数为3)。DataStreamer将数据包流式传输到管线中第1个datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。同样地,第2个datanode存储该数据包并且发送给管线中的第3个datanode(step 4).
  DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为"确认队列"(ask queue)。当收到管道中所有datanode确认信息后,该数据包才会从确认队列删除(step 5)。
  如果在数据写入期间,datanode发生故障,则执行以下操作。首先关闭管线,确认把队列中的任何数据包都添加回数据队列的最前端,以确保故障节点的下游的datanode不会漏掉任何一个数据包。为存储在另一个正常datanode的当前数据块制定一个新的标识,并将该标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块。从管线中删除故障数据节点并把余下的数据块写入管线中的两个正常的datanode。namenode注意到复本量不足时,会在另一个节点上创建一个新的复本。
  客户端完成数据的写入后,会对数据流调用close()方法(step6)。该操作将剩余的所有数据包写入datanode管线中,并在联系namenode且发送文件写入完成信号之前,等待确认(step 7)。
  3. 复本的布局
  Hadoop的默认布局策略是在运行客户端的节点上放第1个复本,第2个复本放在与第1个不同且随机另外选择的机架中节点上。其他复本放在集群中随机选择的节点上。一旦选定复本的放置位置,就会根据网络拓扑创建一个管线,如果复本数为3,则有如图所示的管线:
DSC0002.jpg

  4 一致模型
  在创建一个文件后,希望能在文件系统的命名空间中立即可见,如:
  

Path p = new Path("p");  
Fs.create(p);
  
assertThat(fs.exists(p),is(
true))  

  但是写入文件的内容并不能保证立即可见,即使数据流已经刷新并存储,所以文件长度显示为0:
  

Path p = new Path("p");  
OutputStream out
= fs.create(p);  
out.write(
"content".getBytes("UTF-8"));  
out.flush();
  
assertThat(fs.getFileStatus(p).getLen(),is(
0L))  

  当写入的数据超过一个块后,新的reader就能看见第一个块,总之,其他reader不能看见正在写入的块。
  HDFS调用FSDataOutputStream调用sync()方法可以强制所有的缓存与数据节点同步,因此,对所有的reader,HDFS能保证文件中到目前为止写入的数据均可见且一致。
  

Path p = new Path("p");  
FSDataOutputStream out
= fs.create(p);  
out.write(
"content".getBytes("UTF-8"));  
out.flush();
  
out.sync();
  
assertThat(fs.getFileStatus(p).getLen(),is((
long)"content".length()));  

  5. 通过distcp并行复制
  distcp的典型应用就是在两个HDFS集群之间传输数据,如果两个集群运行相同版本的Hadoop,就非常适合使用hdfs方案:
  % hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar
  该指令将第一个集群的/foo目录复制到第二个集群的/bar目录下,注意:源路径必须是绝对路径!
  默认情况下,distcp会跳过目标路径下已经存在的文件,但可通过-overwrite选项覆盖现有的文件。也可以通过-update选项来选择仅更新修改过的文件。
  当改变先前例子中第一个集群/foo子树下的一个文件,可通过如下命令将修改同步到第二个集群上。
  % hadoop distcp -update hdfs://namenode1/foo hdfs://namenode2/bar/foo
  当试图在两个运行着不同HDFS版本的集群上使用distcp复制数据,并且使用hdfs协议,由于两个版本的RPC系统不兼容,因此复制作业失败。可使用基于只读HTTP协议的HFTP文件系统并从源文件系统中读取数据。
  % hadoop distcp hftp://namenode1:50070/foo hdfs://namenode2/bar
  6. Hadoop存档
  Hadoop存档文件或HAR文件,将文件存入HDFS块,减少namenode内存使用的同时,还能允许对文件进行透明的访问。
  

// 希望存档的文档  
% hadoop fs -lsr /my/files
  

  
// 运行archive指令存档,第一个选项files.har是存档文件的名称,
  
// /my/files是需要存档的文件,/my是HAR文件的输出目录
  
% hadoop archive -archiveName files.har  /my/files /my
  

  
// 查看创建的文档
  
% hadoop fs -ls /my
  

  
// 如下命令可现实HAR文件的组成部分:两个索引文件及部分文件的集合
  
% hadoop fs -ls /my/files.har
  

  
// 递归列出存档文件中的部分文件
  
% hadoop fs -lsr har:///my/files.har
  

  
// 删除HAR文件,对于基础文件系统,HAR文件是一个目录
  
% hadoop fs -rmr /my/files.bar
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-424860-1-1.html 上篇帖子: Hadoop/Spark开发环境配置 下篇帖子: hadoop系列一:hadoop集群安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表