设为首页 收藏本站
查看: 848|回复: 0

[经验分享] Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)

[复制链接]

尚未签到

发表于 2017-12-18 13:18:26 | 显示全部楼层 |阅读模式
1 package zhouls.bigdata.myMapReduce.ScoreCount;  

  2  
  3 import java.io.IOException;
  
  4 import org.apache.hadoop.conf.Configuration;
  
  5 import org.apache.hadoop.fs.FSDataInputStream;
  
  6 import org.apache.hadoop.fs.FileSystem;
  
  7 import org.apache.hadoop.fs.Path;
  
  8 import org.apache.hadoop.io.Text;
  
  9 import org.apache.hadoop.mapreduce.InputSplit;
  
10 import org.apache.hadoop.mapreduce.JobContext;
  
11 import org.apache.hadoop.mapreduce.RecordReader;
  
12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
  
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  
15 import org.apache.hadoop.util.LineReader;
  
16 /**
  
17 * 自定义学生成绩读写InputFormat
  
18 * 数据格式参考:19020090017 小讲 90 99 100 89 95
  
19 * @author Bertron
  
20 */
  
21
  
22             //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
  
23             //比如   ScoreInputFormat  extends FileInputFormat implements InputFormat。
  
24
  
25             //问:自定义输入格式 ScoreInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
  
26

  
27 public>  
28
  
29 //    线路是: boolean  isSplitable()   ->   RecordReader<Text,ScoreWritable> createRecordReader()   ->   ScoreRecordReader extends RecordReader<Text, ScoreWritable >
  
30     
  
31     @Override
  
32     protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
  
33             //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
  
34 //        如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
  
35 //        如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
  
36         return false;    //整个文件封装到一个InputSplit
  
37         //要么就是return true;        //切分64MB大小的一块一块,再封装到InputSplit
  
38     }
  
39     
  
40     @Override
  
41     public RecordReader<Text,ScoreWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
  
42 //        RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
  
43 //        createRecordReader是方法,在这里是,ScoreInputFormat.createRecordReader。ScoreInputFormat是InputFormat类的实例
  
44 //        InputSplit input和TaskAttemptContext context是传入参数
  
45         
  
46 //        isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
  
47 //        isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
  
48         
  
49         //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类ScoreRecordReader。
  
50         //类似与Excel、WeiBo、TVPlayData代码写法
  
51         return new ScoreRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
  
52     }
  
53     
  
54     
  
55     //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为ScoreWritable类型封装学生所有成绩

  
56     public static>  
57         public LineReader in;//行读取器
  
58         public Text line;//每行数据类型
  
59         public Text lineKey;//自定义key类型,即k1
  
60         public ScoreWritable lineValue;//自定义value类型,即v1
  
61         
  
62         @Override
  
63         public void close() throws IOException {//关闭输入流
  
64             if(in !=null){
  
65                 in.close();
  
66             }
  
67         }
  
68         @Override
  
69         public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
  
70             return lineKey;//返回类型是Text,即Text lineKey
  
71         }
  
72         @Override
  
73         public ScoreWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
  
74             return lineValue;//返回类型是ScoreWritable,即ScoreWritable lineValue
  
75         }
  
76         @Override
  
77         public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
  
78             return 0;//返回类型是float,即float 0
  
79         }
  
80         @Override
  
81         public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
  
82             FileSplit split=(FileSplit)input;
  
83             Configuration job=context.getConfiguration();
  
84             Path file=split.getPath();
  
85             FileSystem fs=file.getFileSystem(job);
  
86            
  
87             FSDataInputStream filein=fs.open(file);
  
88             in=new LineReader(filein,job);//输入流in
  
89             line=new Text();//每行数据类型
  
90             lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
  
91             lineValue = new ScoreWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
  
92         }
  
93         
  
94         //此方法读取每行数据,完成自定义的key和value
  
95         @Override
  
96         public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
  
97             int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
  
98            
  
99 //            是SplitLineReader.readLine  ->  SplitLineReader  extends   LineReader  ->  org.apache.hadoop.util.LineReader
  
100            
  
101 //            in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
  
102 //            in.readLine(str, maxLineLength)//只读到maxLineLength行
  
103 //            in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
  
104
  
105             if(linesize==0) return false;
  
106            
  
107            
  
108             String[] pieces = line.toString().split("\\s+");//解析每行数据
  
109                     //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
  
110            
  
111             if(pieces.length != 7){
  
112                 throw new IOException("Invalid record received");
  
113             }
  
114             //将学生的每门成绩转换为 float 类型
  
115             float a,b,c,d,e;
  
116             try{
  
117                 a = Float.parseFloat(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a
  
118                 b = Float.parseFloat(pieces[3].trim());
  
119                 c = Float.parseFloat(pieces[4].trim());
  
120                 d = Float.parseFloat(pieces[5].trim());
  
121                 e = Float.parseFloat(pieces[6].trim());
  
122             }catch(NumberFormatException nfe){
  
123                 throw new IOException("Error parsing floating poing value in record");
  
124             }
  
125             lineKey.set(pieces[0]+"\t"+pieces[1]);//完成自定义key数据
  
126             lineValue.set(a, b, c, d, e);//封装自定义value数据
  
127 //            或者写
  
128 //            lineValue.set(Float.parseFloat(pieces[2].trim()),Float.parseFloat(pieces[3].trim()),Float.parseFloat(pieces[4].trim()),
  
129 //                    Float.parseFloat(pieces[5].trim()),Float.parseFloat(pieces[6].trim()));
  
130            
  
131 //            pieces[0]   pieces[1] pieces[2]  ... pieces[6]
  
132 //            19020090040 秦心芯 123 131 100 95 100
  
133 //            19020090006 李磊 99 92 100 90 100
  
134 //            19020090017 唐一建 90 99 100 89 95
  
135 //            19020090031 曾丽丽 100 99 97 79 96
  
136 //            19020090013 罗开俊 105 115 94 45 100
  
137 //            19020090039 周世海 114 116 93 31 97
  
138 //            19020090020 王正伟 109 98 88 47 99
  
139 //            19020090025 谢瑞彬 94 120 100 50 73
  
140 //            19020090007 于微 89 78 100 66 99
  
141 //            19020090012 刘小利 87 82 89 71 99
  
142            
  
143            
  
144            
  
145             return true;
  
146         }        
  
147     }
  
148 }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425376-1-1.html 上篇帖子: Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九) 下篇帖子: Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表