设为首页 收藏本站
查看: 1006|回复: 0

[经验分享] Hadoop之HDFS原理及文件上传下载源码分析(上)

[复制链接]

尚未签到

发表于 2017-12-16 21:31:10 | 显示全部楼层 |阅读模式
HDFS原理
  首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来。
  楼主的环境:


  •   操作系统:Ubuntu 15.10
  •   hadoop版本:2.7.3
  •   HA:否(随便搭了个伪分布式)
文件上传
  下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程:
DSC0000.png

  首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯。
  NameNode与各DataNode使用心跳机制来获取DataNode信息。NameNode收到Client请求后,获取DataNode信息,并将可存储文件的节点信息返回给Client。
  Client收到NameNode返回的信息,与对应的DataNode节点取得联系,并向该节点写文件。
  文件写入到DataNode后,以流水线的方式复制到其他DataNode(当然,这里面也有DataNode向NameNode申请block,这里不详细介绍),至于复制多少份,与所配置的hdfs-default.xml中的dfs.replication相关。

  元数据存储
  先明确几个概念:
  fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
  edits:操作日志文件。
  fstime:保存最近一次checkpoint的时间
  checkpoint可在hdfs-default.xml中具体配置,默认为3600秒:
  

<property>  
   <name>dfs.namenode.checkpoint.period</name>
  
   <value>3600</value>
  
   <description>The number of seconds between two periodic checkpoints.
  
   </description>
  
</property>
  

  fsimage和edits文件在namenode目录可以看到:
DSC0001.png

  NameNode中的元数据信息:
DSC0002.png

  test.log文件上传后,Namenode始终在内存中保存metedata,用于处理“读请求”。metedata主要存储了文件名称(FileName),副本数量(replicas),分多少block存储(block-ids),分别存储在哪个节点上(id2host)等。
  到有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回
  hadoop会维护一个fsimage文件,也就是namenode中metedata的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。此时Secondary namenode就派上用场了,合并fsimage和edits文件并更新NameNode的metedata。
  Secondary namenode工作流程:


  • secondary通知namenode切换edits文件
  • secondary通过http请求从namenode获得fsimage和edits文件
  • secondary将fsimage载入内存,然后开始合并edits
  • secondary将新的fsimage发回给namenode
  • namenode用新的fsimage替换旧的fsimage
  通过一张图可以表示为:
DSC0003.png


文件下载
  文件下载相对来说就简单一些了,如图所示,Client要从DataNode上,读取test.log文件。而test.log由block1和block2组成。
DSC0004.png

  文件下载的主要流程为:


  • client向namenode发送请求。
  • namenode查看Metadata信息,返回test.log的block的位置。
  Block1: h0,h1,h3
  Block2: h0,h2,h4


  • 开始从h0节点下载block1,block2。
源码分析
  我们先简单使用hadoop提供的API来实现文件的上传下载(文件删除、改名等操作比较简单,这里不演示):
  

package cn.jon.hadoop.hdfs;  

  
import java.io.FileInputStream;
  
import java.io.FileOutputStream;
  
import java.io.IOException;
  
import java.io.InputStream;
  
import java.io.OutputStream;
  
import java.net.URI;
  
import java.net.URISyntaxException;
  

  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.fs.FileSystem;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.IOUtils;
  
import org.junit.Before;
  
import org.junit.Test;
  


  
public>  
     FileSystem fs = null;   
  
     @Before
  
     public void init(){
  
         try {
  
             //初始化文件系统
  
             fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root");
  
         } catch (IOException e) {
  
             e.printStackTrace();
  
         } catch (InterruptedException e) {
  
             e.printStackTrace();
  
         } catch (URISyntaxException e) {
  
             e.printStackTrace();
  
         }
  
     }
  
     public static void main(String[] args) {
  
         
  
     }
  
     @Test
  
     /**
  
      * 文件上传
  
      */
  
     public void testFileUpload(){
  
         try {
  
             OutputStream os = fs.create(new Path("/test.log"));
  
             FileInputStream fis = new FileInputStream("I://test.log");
  
             IOUtils.copyBytes(fis, os, 2048,true);
  
             //可以使用hadoop提供的简单方式
  
             fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log"));
  
         } catch (IllegalArgumentException | IOException e) {
  
             e.printStackTrace();
  
         }
  
     }
  
     @Test   
  
     /**
  
      * 文件下载
  
      */
  
     public void testFileDownload(){
  
         try {
  
             InputStream is = fs.open(new Path("/test.log"));
  
             FileOutputStream fos = new FileOutputStream("E://test.log");            
  
             IOUtils.copyBytes(is, fos, 2048);
  
             //可以使用hadoop提供的简单方式
  
             fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log"));
  
         } catch (IllegalArgumentException | IOException e) {
  
             e.printStackTrace();
  
         }
  
     }
  

  
}
  

  显而易见,只要是对hdfs上的文件进行操作,必须对FileSystem进行初始化,我们先来分析FileSystem的初始化:
  

  public static FileSystem get(URI uri, Configuration conf) throws IOException {  

     return CACHE.get(uri, conf);//部分方法我只截取了部分代码,这里进入get()方法  
   }
  

  

    FileSystem get(URI uri, Configuration conf) throws IOException{  

       Key key = new Key(uri, conf);  

       return getInternal(uri, conf, key);//调用getInternal()  
     }
  

  

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{  

      //使用单例模式创建FileSystem,这是由于FS的初始化需要大量的时间,使用单例保证只是第一次加载慢一些,返回FileSystem的子类实现DistributedFileSystem  
       FileSystem fs;
  
       synchronized (this) {
  
         fs = map.get(key);
  
       }
  
       if (fs != null) {
  
         return fs;
  
       }
  

  
       fs = createFileSystem(uri, conf);
  
       synchronized (this) { // refetch the lock again
  
         FileSystem oldfs = map.get(key);
  
         if (oldfs != null) { // a file system is created while lock is>  
           fs.close(); // close the new file system
  
           return oldfs;  // return the old file system
  
         }
  
         
  
         // now insert the new file system into the map
  
         if (map.isEmpty()
  
                 && !ShutdownHookManager.get().isShutdownInProgress()) {
  
           ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
  
         }
  
         fs.key = key;
  
         map.put(key, fs);
  
         if (conf.getBoolean("fs.automatic.close", true)) {
  
           toAutoClose.add(key);
  
         }
  
         return fs;
  
       }
  
     }
  

  

public void initialize(URI uri, Configuration conf) throws IOException {  

     super.initialize(uri, conf);  

     setConf(conf);  

  
     String host = uri.getHost();
  
     if (host == null) {
  
       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
  
     }
  
     homeDirPrefix = conf.get(
  
         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
  
         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
  
     
  
     this.dfs = new DFSClient(uri, conf, statistics);//实例化DFSClient,并将它作为DistributedFileSystem的引用,下面我们跟进去
  
     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
  
     this.workingDir = getHomeDirectory();
  
   }
  

  

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,  

       Configuration conf, FileSystem.Statistics stats)  

     throws IOException {  

     //该构造太长,楼主只截取了重要部分给大家展示,有感兴趣的同学可以亲手进源码瞧瞧  
     NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
  
     //这里声明了NameNode的代理对象,跟我们前面讨论的rpc就息息相关了
  
     if (proxyInfo != null) {
  
       this.dtService = proxyInfo.getDelegationTokenService();
  
       this.namenode = proxyInfo.getProxy();
  
     } else if (rpcNamenode != null) {   
  
       Preconditions.checkArgument(nameNodeUri == null);
  
       this.namenode = rpcNamenode;
  
       dtService = null;
  
     } else {
  
       Preconditions.checkArgument(nameNodeUri != null,
  
           "null URI");
  
       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
  
           ClientProtocol.class, nnFallbackToSimpleAuth);
  
       this.dtService = proxyInfo.getDelegationTokenService();
  
       this.namenode = proxyInfo.getProxy();//获取NameNode代理对象引用并自己持有,this.namenode类型为ClientProtocol,它是一个接口,我们看下这个接口
  
     }
  
   }
  

  

public interface ClientProtocol{  

       public static final long versionID = 69L;  

       //还有很多对NameNode操作的方法申明,包括对文件上传,下载,删除等  

       //楼主特意把versionID贴出来了,这就跟我们写的RPCDemo中的MyBizable接口完全类似,所以说Client一旦拿到该接口实现类的代理对象(NameNodeRpcServer),Client就可以实现与NameNode的RPC通信,我们继续跟进  
}
  

  

  public static <T> ProxyAndInfo<T> createProxy(Configuration conf,  

       URI nameNodeUri,>

       throws IOException {  

     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =  
         createFailoverProxyProvider(conf, nameNodeUri, xface, true,
  
           fallbackToSimpleAuth);  
  
     if (failoverProxyProvider == null) {
  
       // 如果不是HA的创建方式,楼主环境是伪分布式,所以走这里,我们跟进去
  
       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
  
           UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
  
     } else {
  
       // 如果有HA的创建方式
  
       Conf config = new Conf(conf);
  
       T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
  
           RetryPolicies.failoverOnNetworkException(
  
               RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
  
               config.maxRetryAttempts, config.failoverSleepBaseMillis,
  
               config.failoverSleepMaxMillis));
  
       return new ProxyAndInfo<T>(proxy, dtService,
  
           NameNode.getAddress(nameNodeUri));
  
     }
  
   }
  

  最终返回的为ClientProtocol接口的子类代理对象,而NameNodeRpcServer类实现了ClientProtocol接口,因此返回的为NameNode的代理对象,当客户端拿到了NameNode的代理对象后,即与NameNode建立了RPC通信:
  

private static ClientProtocol createNNProxyWithClientProtocol(  

       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,  

       boolean withRetries, AtomicBoolean fallbackToSimpleAuth)  

       throws IOException {  

     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是不是感觉越来越像我们前面说到的RPC  

  
     final RetryPolicy defaultPolicy =
  
         RetryUtils.getDefaultRetryPolicy(//加载默认策虐
  
             conf,
  
             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
  
             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
  
             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
  
             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
  
             SafeModeException.class);
  
     
  
     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
  
     //看到versionId了吗?这下明白了rpc的使用中目标接口必须要有这个字段了吧
  
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
  
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
  
         NetUtils.getDefaultSocketFactory(conf),
  
         org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
  
         fallbackToSimpleAuth).getProxy();
  
     //看到没?这里使用 RPC.getProtocolProxy()来创建ClientNamenodeProtocolPB对象,调试时可以清楚的看见,该对象引用的是一个代理对象,值为$Proxy12,由JDK的动态代理来实现。
  
     //前面我们写RPCDemo程序时,用的是RPC.getProxy(),但是各位大家可以去看RPC源码,RPC.getProtocolProxy()最终还是调用的getProxy()
  
     if (withRetries) {
  
       Map<String, RetryPolicy> methodNameToPolicyMap
  
                  = new HashMap<String, RetryPolicy>();   
  
       ClientProtocol translatorProxy =
  
         new ClientNamenodeProtocolTranslatorPB(proxy);
  
       return (ClientProtocol) RetryProxy.create(//这里再次使用代理模式对代理对象进行包装,也可以理解为装饰者模式
  
           ClientProtocol.class,
  
           new DefaultFailoverProxyProvider<ClientProtocol>(
  
               ClientProtocol.class, translatorProxy),
  
           methodNameToPolicyMap,
  
           defaultPolicy);
  
     } else {
  
       return new ClientNamenodeProtocolTranslatorPB(proxy);
  
     }
  
   }
  

  整个FileSystem的初始化用时序图表示为:
DSC0005.png

  到此,FileSystem的初始化就基本完成。由于文章篇幅过大的问题,所以楼主把HDFS原理及源码分析拆分成了两部分,上半部分主要是HDFS原理与FileSystem的初始化介绍,那在下半部分将会具体介绍HDFS文件上传、下载的源码解析。
  另外,文章用到的一些示例代码,将会在下半部分发布后,楼主一起上传到GitHub。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-424838-1-1.html 上篇帖子: hadoop 性能调优与运维 下篇帖子: Hadoop概念学习系列之关于hadoop-2.2.0和hadoop2.6.0的winutils.exe、hadoop.dll版本混用(易出错)(四十三)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表