|
1 package zhouls.bigdata.myMapReduce.ScoreCount;
2
3 import java.io.IOException;
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.fs.FSDataInputStream;
6 import org.apache.hadoop.fs.FileSystem;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.InputSplit;
10 import org.apache.hadoop.mapreduce.JobContext;
11 import org.apache.hadoop.mapreduce.RecordReader;
12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
15 import org.apache.hadoop.util.LineReader;
16 /**
17 * 自定义学生成绩读写InputFormat
18 * 数据格式参考:19020090017 小讲 90 99 100 89 95
19 * @author Bertron
20 */
21
22 //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
23 //比如 ScoreInputFormat extends FileInputFormat implements InputFormat。
24
25 //问:自定义输入格式 ScoreInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
26
27 public>
28
29 // 线路是: boolean isSplitable() -> RecordReader<Text,ScoreWritable> createRecordReader() -> ScoreRecordReader extends RecordReader<Text, ScoreWritable >
30
31 @Override
32 protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
33 //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
34 // 如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
35 // 如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
36 return false; //整个文件封装到一个InputSplit
37 //要么就是return true; //切分64MB大小的一块一块,再封装到InputSplit
38 }
39
40 @Override
41 public RecordReader<Text,ScoreWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
42 // RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
43 // createRecordReader是方法,在这里是,ScoreInputFormat.createRecordReader。ScoreInputFormat是InputFormat类的实例
44 // InputSplit input和TaskAttemptContext context是传入参数
45
46 // isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
47 // isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
48
49 //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类ScoreRecordReader。
50 //类似与Excel、WeiBo、TVPlayData代码写法
51 return new ScoreRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
52 }
53
54
55 //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为ScoreWritable类型封装学生所有成绩
56 public static>
57 public LineReader in;//行读取器
58 public Text line;//每行数据类型
59 public Text lineKey;//自定义key类型,即k1
60 public ScoreWritable lineValue;//自定义value类型,即v1
61
62 @Override
63 public void close() throws IOException {//关闭输入流
64 if(in !=null){
65 in.close();
66 }
67 }
68 @Override
69 public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
70 return lineKey;//返回类型是Text,即Text lineKey
71 }
72 @Override
73 public ScoreWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
74 return lineValue;//返回类型是ScoreWritable,即ScoreWritable lineValue
75 }
76 @Override
77 public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
78 return 0;//返回类型是float,即float 0
79 }
80 @Override
81 public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
82 FileSplit split=(FileSplit)input;
83 Configuration job=context.getConfiguration();
84 Path file=split.getPath();
85 FileSystem fs=file.getFileSystem(job);
86
87 FSDataInputStream filein=fs.open(file);
88 in=new LineReader(filein,job);//输入流in
89 line=new Text();//每行数据类型
90 lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
91 lineValue = new ScoreWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
92 }
93
94 //此方法读取每行数据,完成自定义的key和value
95 @Override
96 public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
97 int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
98
99 // 是SplitLineReader.readLine -> SplitLineReader extends LineReader -> org.apache.hadoop.util.LineReader
100
101 // in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
102 // in.readLine(str, maxLineLength)//只读到maxLineLength行
103 // in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
104
105 if(linesize==0) return false;
106
107
108 String[] pieces = line.toString().split("\\s+");//解析每行数据
109 //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
110
111 if(pieces.length != 7){
112 throw new IOException("Invalid record received");
113 }
114 //将学生的每门成绩转换为 float 类型
115 float a,b,c,d,e;
116 try{
117 a = Float.parseFloat(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a
118 b = Float.parseFloat(pieces[3].trim());
119 c = Float.parseFloat(pieces[4].trim());
120 d = Float.parseFloat(pieces[5].trim());
121 e = Float.parseFloat(pieces[6].trim());
122 }catch(NumberFormatException nfe){
123 throw new IOException("Error parsing floating poing value in record");
124 }
125 lineKey.set(pieces[0]+"\t"+pieces[1]);//完成自定义key数据
126 lineValue.set(a, b, c, d, e);//封装自定义value数据
127 // 或者写
128 // lineValue.set(Float.parseFloat(pieces[2].trim()),Float.parseFloat(pieces[3].trim()),Float.parseFloat(pieces[4].trim()),
129 // Float.parseFloat(pieces[5].trim()),Float.parseFloat(pieces[6].trim()));
130
131 // pieces[0] pieces[1] pieces[2] ... pieces[6]
132 // 19020090040 秦心芯 123 131 100 95 100
133 // 19020090006 李磊 99 92 100 90 100
134 // 19020090017 唐一建 90 99 100 89 95
135 // 19020090031 曾丽丽 100 99 97 79 96
136 // 19020090013 罗开俊 105 115 94 45 100
137 // 19020090039 周世海 114 116 93 31 97
138 // 19020090020 王正伟 109 98 88 47 99
139 // 19020090025 谢瑞彬 94 120 100 50 73
140 // 19020090007 于微 89 78 100 66 99
141 // 19020090012 刘小利 87 82 89 71 99
142
143
144
145 return true;
146 }
147 }
148 } |
|