设为首页 收藏本站
查看: 1223|回复: 0

[经验分享] Hadoop MapReduce编程 API入门系列之小文件合并(二十九)

[复制链接]

尚未签到

发表于 2017-12-18 10:51:45 | 显示全部楼层 |阅读模式
package zhouls.bigdata.myMapReduce.MergeSmallFiles;  

  
import java.io.IOException;
  
import java.net.URI;
  
import java.net.URISyntaxException;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.fs.FSDataInputStream;
  
import org.apache.hadoop.fs.FSDataOutputStream;
  
import org.apache.hadoop.fs.FileStatus;
  
import org.apache.hadoop.fs.FileSystem;
  
import org.apache.hadoop.fs.FileUtil;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.fs.PathFilter;
  
import org.apache.hadoop.io.IOUtils;
  

/**  
* function 合并小文件至 HDFS     ,  文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
  
* @author 小讲
  
*
  

*/
  
public>  
private static FileSystem fs = null;
  
private static FileSystem local = null;
  
/**
  
* @function main
  
* @param args
  
* @throws IOException
  
* @throws URISyntaxException
  
*/
  
public static void main(String[] args) throws IOException,
  
URISyntaxException {
  
list();
  
}
  

  
/**
  
*
  
* @throws IOException
  
* @throws URISyntaxException
  
*/
  
public static void list() throws IOException, URISyntaxException {
  
// 读取hadoop文件系统的配置
  
Configuration conf = new Configuration();
  
//文件系统访问接口
  
URI uri = new URI("hdfs://master:9000");
  

  
//        URL、URI与Path三者的区别
  
//        Hadoop文件系统中通过Hadoop Path对象来代表一个文件   
  
//        URL(相当于绝对路径)    ->   (文件) ->    URI(相当于相对路径,即代表URL前面的那一部分)
  
//        URI:如hdfs://master:9000
  
//        如,URL.openStream
  

  

  

  
//获得FileSystem实例,即HDFS
  
fs = FileSystem.get(uri, conf);
  

  

  
//获得FileSystem实例,即Local
  
local = FileSystem.getLocal(conf);
  
//            为什么要获取到Local呢,因为,我们要把本地D盘下data/73目录下的文件要合并后,上传到HDFS里,所以,我们需先获取到Local,再来做合并工作啦!
  

  

  
//过滤目录下的 svn 文件,globStatus从第一个参数通配符合到文件,剔除满足第二个参数到结果,因为PathFilter中accept是return!  
  
FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));//一般这是隐藏文件,所以得排除
  
//        ^表示匹配我们字符串开始的位置               *代表0到多个字符                        $代表字符串结束的位置
  
//        RegexExcludePathFilter来只排除我们不需要的,即svn格式
  
//        RegexExcludePathFilter这个方法我们自己写
  

  
//        但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
  

  
//获取73目录下的所有文件路径,注意FIleUtil中stat2Paths()的使用,它将一个FileStatus对象数组转换为Path对象数组。
  
Path[] dirs = FileUtil.stat2Paths(dirstatus);//dirstatus是FileStatus数组类型
  
        
  

  
FSDataOutputStream out = null;//输出流
  
FSDataInputStream in = null;//输入流
  

  
//        很多人搞不清输入流和输出流,!!!!
  
//        其实啊,输入流、输出流都是针对内存的
  
//        往内存里写,是输入流。
  
//        内存往文件里写,是输出Luis。
  
//        
  
//        比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
  
//           =>   则文件A写到内存里,叫输入流。
  
//           =>    则内存里写到文件B,叫输出流   
  

  

  
for (Path dir : dirs) {//for星型循环,即将dirs是Path对象数组,一一传给Path dir
  
            
  
String fileName = dir.getName().replace("-", "");//文件名称
  
//                        即获取到如2012-09-17,然后经过replace("-", ""),得到20120917
  

  

  
//只接受日期目录下的.txt文件,^匹配输入字符串的开始位置,$匹配输入字符串的结束位置,*匹配0个或多个字符。
  
FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
  
//            FileStatus[] localStatus = local.listStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));//试试,看有什么区别?出现错误的!为什么?
  

  

  
//RegexAcceptPathFilter这个方法,我们自己写
  
//            RegexAcceptPathFilter来只接收我们需要,即txt格式
  
//            这里,我们还可以只接收别的格式,自己去改,一定要锻炼学会改别人的代码
  

  

  
// 获得如2012-09-17日期目录下的所有文件
  
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
  
//            同样,但是我们,最终是要处理文件里的东西,最终是要转成Path类型,因为Path对象f,它对应着一个文件。
  

  

  

  
//输出路径
  
Path block = new Path("hdfs://master:9000/outData/MergeSmallFilesToHDFS/"+ fileName + ".txt");
  

  

  
// 打开输出流
  
out = fs.create(block);//因为,合并小文件之后,比如这是,合并2012-09-17日期目录下的所有小文件,之后,要上传到HDFS里。
  
//                类似于,文件A写到内存里,再内存里写到文件B。而这行代码out = fs.create(block);是相当于是,内存里写到文件B。所以是输出流,即是从内存里输出的,所以叫输出流。
  
//                            这里,文件A是Local                文件B是HDFS
  

  
//                                        文件与块大小(比如128M)来比,小的话,称为小文件。是一个相对概念!相对于数据块而言的!
  

  
//            很多人搞不清输入流和输出流,!!!!
  
//            其实啊,输入流、输出流都是针对内存的
  
//            往内存里写,是输入流。
  
//            内存往文件里写,是输出Luis。
  
//            
  
//            比如一个文件A复制到另一文件B,那么,先写到内存里,再写到文件B。
  
//               =>   则文件A写到内存里,叫输入流。
  
//               =>    则内存里写到文件B,叫输出流   
  

  

  
for (Path p : listedPaths) {//for星型循环,即将listedPaths的值一一传给Path p
  
in = local.open(p);// 打开输入流in
  
//                类似于,文件A写到内存里,再内存里写到文件B。而这行代码in = local.open(p);是相当于是,文件A写到内存里。所以是输如流,即是写到内存里的,所以叫输入流。
  
//                    这里,文件A是Local                文件B是HDFS
  
               
  
IOUtils.copyBytes(in, out, 4096, false); // 复制数据,IOUtils.copyBytes可以方便地将数据写入到文件,不需要自己去控制缓冲区,也不用自己去循环读取输入源。false表示不自动关闭数据流,那么就手动关闭。
  
//                IOUtils.copyBytes这个方法很重要
  
//是否自动关闭输入流和输出流,若是false,就要单独去关闭。则不在这个方法体里关闭输入和输出流了。
  
//                                                     若是true,则在这个方法里关闭输入和输出流。不需单独去关闭了
  

  

  
//                明白,IOUtils类的copyBytes将hdfs数据流拷贝到标准输出流System.out中,
  
//                copyBytes前两个参数好理解,一个输入,一个输出,第三个是缓存大小,第四个指定拷贝完毕后是否关闭流。
  
//                要设置为false,标准输出流不关闭,我们要手动关闭输入流。即,设置为false表示关闭输入流
  

  
//                主要是把最后的这个参数定义好, 就可以了。 定义为true还是false,则决定着是否在这个方法体里关闭
  
//                若定义为true,则在这个方法体里直接关闭输入流、输出流。不需单独去关闭了
  
//                若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了
  

  

  
// 关闭输入流
  
in.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输入流!!!懂了吗
  
            }
  
if (out != null) {//这里为什么不为空,空指针,则说明里面还有资源。
  
// 关闭输出流
  
out.close();//若定义为false,则不在这个方法体里直接关闭输入流、输出流。需单独去关闭了。这就是单独在关闭输出流!!!懂了吗
  
            }
  
}
  

  
}
  

  
/**
  
*
  
* @function 过滤 regex 格式的文件
  
*
  
*/

  
public static>  
private final String regex;//变量
  

  
public RegexExcludePathFilter(String regex) {//这个是上面的那个,正在表达式
  
this.regex = regex;//将String regex的值,赋给RegexExcludePathFilter类里的private final String regex的值
  
        }
  

  
public boolean accept(Path path) {//主要是实现accept方法
  
// TODO Auto-generated method stub
  
boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*svn$
  
return !flag;//如果要接收 regex 格式的文件,则accept()方法就return flag; 如果想要过滤掉regex格式的文件,则accept()方法就return !flag。
  
        }
  

  
}
  

  
/**
  
*
  
* @function 接受 regex 格式的文件
  
*
  
*/

  
public static>  
private final String regex;//变量
  

  
public RegexAcceptPathFilter(String regex) {//这个是上面的那个,正在表达式
  
this.regex = regex;//将String regex的值,赋给RegexAcceptPathFilter类里的private final String regex的值
  
        }
  

  
public boolean accept(Path path) {//主要是实现accept方法
  
// TODO Auto-generated method stub
  
boolean flag = path.toString().matches(regex);//匹配正则表达式,这里是^.*txt$
  
return flag;//如果要接收 regex 格式的文件,则accept()方法就return flag; 如果想要过滤掉regex格式的文件,则accept()方法就return !flag。
  
        }
  

  
}
  
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425327-1-1.html 上篇帖子: Hadoop MapReduce编程 API入门系列之推荐系统(十三) 下篇帖子: Hadoop MapReduce编程 API入门系列之二次排序(十六)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表