lmwtzw6u5l0 发表于 2015-5-27 12:56:03

java海量数据处理(千万级别)(2)-海量数据FTP下载

  这个也是曾经做过的一个程序,目的主要是去ftp主机(最多100左右)去取xx数据文件.
  
              千万级别仅仅是个概念,代表数据量等于千万或者大于千万的数据
              本分享不牵扯分布式採集存储之类的.是在一台机器上处理数据,假设数据量非常大非常大的话,能够考虑分布式处理,假设以后我有这方面的经验,会及时分享的.
  
  1、程序採用的ftp工具, apache 的 commons-net-ftp-2.0.jar
  
  2、千万级别ftp核心关键的部分--列文件夹到文件,仅仅要是这块做好了,基本上性能就沒有太大的问题了.
  
  能够通过apache 发送ftp命令 "NLST" 的方式列文件夹到文件里去
  
  # ftp列文件夹运行的命令 以环境变量的配置优先,不配置则使用默认的列文件夹方式 NLST



Java代码
[*]# DS_LIST_CMD = NLST   
[*]public File sendCommandAndListToFile(String command,String localPathName) throws   IOException   
[*]      {
[*]            try {
[*]                      return client.createFile(command, localPathName);
[*]            } catch (IOException e) {
[*]                      log.error(e);
[*]          throw new IOException("the command "+command +" is incorrect");
[*]            }
[*]      }
  
              当然应该还有其它形式的,大家能够自己研究一下
  
              十万级别以上的数据量的话千万不要使用下面这样的方式,假设用的话 ==== 找死
  
               FTPFile[] dirList = client.listFiles();
  
  3、分批次从文件里读取 要下载的文件名称.载入到内存中处理,或者读取一个文件名称就下载一个文件,不要把全部的数据都载入到内存,假设非常多的话会出问题
     
              为啥要分批次?
              由于是大数据量,假设有1000W条记录,列出来的文件夹文件的大小 1G以上吧
  
  
  4、文件下载的核心代码----关于文件的断点续传, 获得ftp文件的大小和本地文件的大小进行推断,然后使用ftp提供的断点续传功能
              下载文件一定要使用二进制的形式
              client.enterLocalPassiveMode();// 设置为被动模式
              ftpclient.binary();// 一定要使用二进制模式
  
  
     



Java代码
[*]/** 下载所需的文件并支持断点续传,下载后删除FTP文件,以免反复
[*]         * @param pathName 远程文件
[*]         * @param localPath 本地文件
[*]         * @param registerFileName 记录本文件名称称文件夹
[*]         * @param size 上传文件大小
[*]         * @return true 下载及删除成功
[*]         * @throws IOException
[*]         * @throws Exception
[*]         */
[*]      public boolean downLoad(String pathName, String localPath) throws IOException {
[*]                boolean flag = false;
[*]                File file = new File(localPath+".tmp");//设置暂时文件
[*]                FileOutputStream out = null;
[*]                try{
[*]                  client.enterLocalPassiveMode();// 设置为被动模式
[*]                  client.setFileType(FTP.BINARY_FILE_TYPE);//设置为二进制传输
[*]                        if(lff.getIsFileExists(file)){//推断本地文件是否存在,假设存在而且长度小于FTP文件的长度时断点续传;返之新增
[*]                              long size = this.getSize(pathName);
[*]                              long localFileSize = lff.getSize(file);         
[*]                              if(localFileSize > size){
[*]                                        return false;
[*]                              }
[*]                              out = new FileOutputStream(file,true);
[*]                              client.setRestartOffset(localFileSize);
[*]                              flag = client.retrieveFile(new String(pathName.getBytes(),client.getControlEncoding()),out);
[*]                                 
[*]                              out.flush();
[*]                        } else{
[*]                              out = new FileOutputStream(file);
[*]                              flag = client.retrieveFile(new String(pathName.getBytes(),client.getControlEncoding()),out);
[*]                                 
[*]                              out.flush();
[*]                        }
[*]                        
[*]                }catch(IOException e){
[*]                        log.error(e);
[*]            log.error("file download error !");
[*]                        throw e;
[*]                }finally{
[*]                        try{
[*]                              if(null!=out)
[*]                              out.close();
[*]                              if(flag)
[*]                                        lff.rename(file, localPath);
[*]                        }catch(IOException e){
[*]                              throw e;
[*]                        }
[*]                }
[*]                return flag;
[*]      }
[*]    /**
[*]         * 获取文件长度
[*]         * @param fileNamepath 本机文件
[*]         * @return
[*]         * @throws IOException
[*]         */
[*]      public long getSize(String fileNamepath) throws IOException{
[*]                FTPFile [] ftp = client.listFiles(new String(fileNamepath.getBytes(),client.getControlEncoding()));
[*]                return ftp.length==0 ? 0 : ftp.getSize();
[*]      }
[*]
[*]      检測本地文件是否已经下载,假设下载文件的大小.
[*]
[*]       /**
[*]         *本地文件的 获取文件的大小
[*]         * @param file
[*]         * @return
[*]         */
[*]      public long getSize(File file){
[*]                long size = 0;
[*]                if(getIsFileExists(file)){
[*]                        size = file.length();
[*]                }
[*]                return size;
[*]      }
  
  5、由于程序要跑最多100多个线程,在线程监控上做了一些处理,能够检測那些死掉的线程,并及时的拉起来。
              t.setUncaughtExceptionHandler(new ThreadException(exList));
  原理:给每一个线程增加 UncaughtExceptionHandler,死掉的时候把线程相应的信息添加到一个list里面,然后让主线程每隔一段时间扫描一下list,假设有数据,直接又一次建一个线程运行就可以
  
  6、假设程序是常驻内存的话,别忘记了在finally中关闭掉 不用的ftp连接
  
  7、做大数据库採集程序必须考虑到的一件事情   磁盘空间已满的处理
  
         java虚拟机对于磁盘空间已满,在英文环境下的linux aix 机器上 一般报
         There is not enough space in the file system
         中文环境下 一般报 "磁盘空间已满"
         大家能够使用以下的代码进行验证
        
                          



Java代码
[*]//检測磁盘控件是否已满的异常



Java代码
[*]//linux aix There is not enough space in the file system
[*]                     // window There is not enough space in the file system
[*]                     if(e.toString().contains("enough space")||e.toString().contains("磁盘空间已满"))
[*]                     {
[*]                           log.error("channel "+channel_name + " There is not enough space on the disk ");
[*]                           Runtime.getRuntime().exit(0);
[*]                     }
  
  
  
              
                    原创文章 by dyllove98 @ http://jlins.iteye.com
  原创文章 by dyllove98 @ http://blog.iyunv.com/dyllove98
  转载请表明出处。
页: [1]
查看完整版本: java海量数据处理(千万级别)(2)-海量数据FTP下载