java海量数据处理(千万级别)(2)-海量数据FTP下载
这个也是曾经做过的一个程序,目的主要是去ftp主机(最多100左右)去取xx数据文件.千万级别仅仅是个概念,代表数据量等于千万或者大于千万的数据
本分享不牵扯分布式採集存储之类的.是在一台机器上处理数据,假设数据量非常大非常大的话,能够考虑分布式处理,假设以后我有这方面的经验,会及时分享的.
1、程序採用的ftp工具, apache 的 commons-net-ftp-2.0.jar
2、千万级别ftp核心关键的部分--列文件夹到文件,仅仅要是这块做好了,基本上性能就沒有太大的问题了.
能够通过apache 发送ftp命令 "NLST" 的方式列文件夹到文件里去
# ftp列文件夹运行的命令 以环境变量的配置优先,不配置则使用默认的列文件夹方式 NLST
Java代码
[*]# DS_LIST_CMD = NLST
[*]public File sendCommandAndListToFile(String command,String localPathName) throws IOException
[*] {
[*] try {
[*] return client.createFile(command, localPathName);
[*] } catch (IOException e) {
[*] log.error(e);
[*] throw new IOException("the command "+command +" is incorrect");
[*] }
[*] }
当然应该还有其它形式的,大家能够自己研究一下
十万级别以上的数据量的话千万不要使用下面这样的方式,假设用的话 ==== 找死
FTPFile[] dirList = client.listFiles();
3、分批次从文件里读取 要下载的文件名称.载入到内存中处理,或者读取一个文件名称就下载一个文件,不要把全部的数据都载入到内存,假设非常多的话会出问题
为啥要分批次?
由于是大数据量,假设有1000W条记录,列出来的文件夹文件的大小 1G以上吧
4、文件下载的核心代码----关于文件的断点续传, 获得ftp文件的大小和本地文件的大小进行推断,然后使用ftp提供的断点续传功能
下载文件一定要使用二进制的形式
client.enterLocalPassiveMode();// 设置为被动模式
ftpclient.binary();// 一定要使用二进制模式
Java代码
[*]/** 下载所需的文件并支持断点续传,下载后删除FTP文件,以免反复
[*] * @param pathName 远程文件
[*] * @param localPath 本地文件
[*] * @param registerFileName 记录本文件名称称文件夹
[*] * @param size 上传文件大小
[*] * @return true 下载及删除成功
[*] * @throws IOException
[*] * @throws Exception
[*] */
[*] public boolean downLoad(String pathName, String localPath) throws IOException {
[*] boolean flag = false;
[*] File file = new File(localPath+".tmp");//设置暂时文件
[*] FileOutputStream out = null;
[*] try{
[*] client.enterLocalPassiveMode();// 设置为被动模式
[*] client.setFileType(FTP.BINARY_FILE_TYPE);//设置为二进制传输
[*] if(lff.getIsFileExists(file)){//推断本地文件是否存在,假设存在而且长度小于FTP文件的长度时断点续传;返之新增
[*] long size = this.getSize(pathName);
[*] long localFileSize = lff.getSize(file);
[*] if(localFileSize > size){
[*] return false;
[*] }
[*] out = new FileOutputStream(file,true);
[*] client.setRestartOffset(localFileSize);
[*] flag = client.retrieveFile(new String(pathName.getBytes(),client.getControlEncoding()),out);
[*]
[*] out.flush();
[*] } else{
[*] out = new FileOutputStream(file);
[*] flag = client.retrieveFile(new String(pathName.getBytes(),client.getControlEncoding()),out);
[*]
[*] out.flush();
[*] }
[*]
[*] }catch(IOException e){
[*] log.error(e);
[*] log.error("file download error !");
[*] throw e;
[*] }finally{
[*] try{
[*] if(null!=out)
[*] out.close();
[*] if(flag)
[*] lff.rename(file, localPath);
[*] }catch(IOException e){
[*] throw e;
[*] }
[*] }
[*] return flag;
[*] }
[*] /**
[*] * 获取文件长度
[*] * @param fileNamepath 本机文件
[*] * @return
[*] * @throws IOException
[*] */
[*] public long getSize(String fileNamepath) throws IOException{
[*] FTPFile [] ftp = client.listFiles(new String(fileNamepath.getBytes(),client.getControlEncoding()));
[*] return ftp.length==0 ? 0 : ftp.getSize();
[*] }
[*]
[*] 检測本地文件是否已经下载,假设下载文件的大小.
[*]
[*] /**
[*] *本地文件的 获取文件的大小
[*] * @param file
[*] * @return
[*] */
[*] public long getSize(File file){
[*] long size = 0;
[*] if(getIsFileExists(file)){
[*] size = file.length();
[*] }
[*] return size;
[*] }
5、由于程序要跑最多100多个线程,在线程监控上做了一些处理,能够检測那些死掉的线程,并及时的拉起来。
t.setUncaughtExceptionHandler(new ThreadException(exList));
原理:给每一个线程增加 UncaughtExceptionHandler,死掉的时候把线程相应的信息添加到一个list里面,然后让主线程每隔一段时间扫描一下list,假设有数据,直接又一次建一个线程运行就可以
6、假设程序是常驻内存的话,别忘记了在finally中关闭掉 不用的ftp连接
7、做大数据库採集程序必须考虑到的一件事情 磁盘空间已满的处理
java虚拟机对于磁盘空间已满,在英文环境下的linux aix 机器上 一般报
There is not enough space in the file system
中文环境下 一般报 "磁盘空间已满"
大家能够使用以下的代码进行验证
Java代码
[*]//检測磁盘控件是否已满的异常
Java代码
[*]//linux aix There is not enough space in the file system
[*] // window There is not enough space in the file system
[*] if(e.toString().contains("enough space")||e.toString().contains("磁盘空间已满"))
[*] {
[*] log.error("channel "+channel_name + " There is not enough space on the disk ");
[*] Runtime.getRuntime().exit(0);
[*] }
原创文章 by dyllove98 @ http://jlins.iteye.com
原创文章 by dyllove98 @ http://blog.iyunv.com/dyllove98
转载请表明出处。
页:
[1]