|
1.定制Writable类型
Hadoop中有一套Writable实现,例如:IntWritable、Text等,但是,有时候可能并不能满足自己的需求,这个时候,就需要自己定制Writable类型。
定制分以下几步:
- 需要实现WritableComparable接口,因为Writable常常作为健值对出现,而在MapReduce中,中间有个排序很重要,因此,Hadoop中就让Writable实现了WritableComparable
- 需要实现WritableComparable的write()、readFields()、compareTo()方法
- 需要重写java.lang.Object中的hashCode()、equals()、toString()方法。
由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法
如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法
Example:
CreateWritable.java
1 package cn.roboson.writable;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.io.WritableComparable;
9 /**
10 * 1.定制一个Writable类型,里面包含两个Text
11 * 2.和IntWritable等原有的Writable类似,它需要实现WritableComparable
12 * 3.由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法
13 * 4.如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法
14 * @author roboson
15 *
16 */
17
18 public class CreateWritable implements WritableComparable{
19
20 private Text first;
21 private Text second;
22
23 //构造方法,这个是必须要的
24 public CreateWritable(){
25
26 }
27
28 public CreateWritable(String first,String second){
29 set(new Text(first), new Text(second));
30 }
31
32 public CreateWritable(Text first,Text second){
33 set(first,second);
34 }
35
36
37
38 public Text getFirst() {
39 return first;
40 }
41
42 public void setFirst(Text first) {
43 this.first = first;
44 }
45
46 public Text getSecond() {
47 return second;
48 }
49
50 public void setSecond(Text second) {
51 this.second = second;
52 }
53
54 public void set(Text first,Text second){
55 this.first=first;
56 this.second=second;
57 }
58
59
60 @Override
61 public void readFields(DataInput in) throws IOException {
62 // TODO Auto-generated method stub
63 first.readFields(in);
64 second.readFields(in);
65 }
66
67 @Override
68 public void write(DataOutput out) throws IOException {
69 // TODO Auto-generated method stub
70 first.write(out);
71 second.write(out);
72 }
73
74 @Override
75 public int compareTo(CreateWritable other) {
76 // TODO Auto-generated method stub
77 int cmp=first.compareTo(other.getFirst());
78 if(cmp!=0){
79 return cmp;
80 }
81 return second.compareTo(other.getSecond());
82 }
83
84 @Override
85 public int hashCode() {
86 // TODO Auto-generated method stub
87 return first.hashCode()*163 + second.hashCode();
88 }
89
90 @Override
91 public String toString() {
92 // TODO Auto-generated method stub
93 return first+"\t"+second;
94 }
95
96 @Override
97 public boolean equals(Object obj) {
98 // TODO Auto-generated method stub
99 if(obj instanceof CreateWritable){
100 CreateWritable create = (CreateWritable) obj;
101 return first.equals(create.getFirst()) && second.equals(create.getSecond());
102 }
103 return false;
104 }
105
106 }
Writable06.java
1 package cn.roboson.writable;
2
3 import org.apache.hadoop.io.Text;
4
5 public class Writable06 {
6
7 public static void main(String[] args) {
8 CreateWritable createWritable01 = new CreateWritable("Hadoop", "roboson");
9 CreateWritable createWritable02 = new CreateWritable(new Text("Hadoop"), new Text("roboson"));
10
11 //重写了equals()方法,相比叫first 和 second两者都相同的时候,返回true
12 System.out.println(createWritable01.equals(createWritable02));
13
14 //重写了compareTo()方法,先比较first,再比较second,返回0:相等 -1:firstsecond
15 System.out.println(createWritable01.compareTo(createWritable02));
16
17 //重写了toString()方法,返回 first +"\t"+second
18 System.out.println(createWritable01.toString());
19 }
20 }
运行结果:
2.为速度实现一个RawComparator
关于Comparator方面的知识,可以参考我的博文《Hadoop中WritableComparable 和 comparator》,Comparator中,有一个方法是compare(byte[] b1,int s1,int l1,byte[] b2,int s2, int l2),该方法直接比较的是序列化,不必先将序列化数据流转化为对象,再进行比较,所以,与上面的compareTo()方法,相比,其更高效!其实,我们查看HadoopAPI,就可以发现,基本类型的Writable类都实现有了Comparator作为其内部类:
好,再看看IntWritable.Comparator这个类,如下图所示,发现它是一个静态类,好好观察它的结构:
通过上面的,我们可以发现,要为一个Writable实现一个Comparator,安装Hadoop的格式来,需要注意以下几点:
- 将其在内部实现,作为内部类
- 是一个静态的内部类
- 注册,以便可以通过WritableComparator直接创建
至于注册方面的知识,可以查看我的博客《Hadoop中Comparator原理》
Example:给上面自定义的CreateWritable类实现一个Comparator,也就是CreateWritable.Comparator类
CreateWritable.java
1 package cn.roboson.writable;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6 import java.util.Comparator;
7
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.io.WritableComparable;
10 import org.apache.hadoop.io.WritableComparator;
11 import org.apache.hadoop.io.WritableUtils;
12 /**
13 * 1.给定制的CreateWritable类实现一个Comparator,CreateWritable.Comparator
14 * @author roboson
15 *
16 */
17
18 public class CreateWritable implements WritableComparable{
19
20 private Text first;
21 private Text second;
22
23 //构造方法,这个是必须要的
24 public CreateWritable(){
25
26 }
27
28 public CreateWritable(String first,String second){
29 set(new Text(first), new Text(second));
30 }
31
32 public CreateWritable(Text first,Text second){
33 set(first,second);
34 }
35
36
37
38 public Text getFirst() {
39 return first;
40 }
41
42 public void setFirst(Text first) {
43 this.first = first;
44 }
45
46 public Text getSecond() {
47 return second;
48 }
49
50 public void setSecond(Text second) {
51 this.second = second;
52 }
53
54 public void set(Text first,Text second){
55 this.first=first;
56 this.second=second;
57 }
58
59
60 @Override
61 public void readFields(DataInput in) throws IOException {
62 // TODO Auto-generated method stub
63 first.readFields(in);
64 second.readFields(in);
65 }
66
67 @Override
68 public void write(DataOutput out) throws IOException {
69 // TODO Auto-generated method stub
70 System.out.println(first);
71 first.write(out);
72 second.write(out);
73 }
74
75 @Override
76 public int compareTo(CreateWritable other) {
77 // TODO Auto-generated method stub
78 int cmp=first.compareTo(other.getFirst());
79 if(cmp!=0){
80 return cmp;
81 }
82 return second.compareTo(other.getSecond());
83 }
84
85 @Override
86 public int hashCode() {
87 // TODO Auto-generated method stub
88 return first.hashCode()*163 + second.hashCode();
89 }
90
91 @Override
92 public String toString() {
93 // TODO Auto-generated method stub
94 return first+"\t"+second;
95 }
96
97 @Override
98 public boolean equals(Object obj) {
99 // TODO Auto-generated method stub
100 if(obj instanceof CreateWritable){
101 CreateWritable create = (CreateWritable) obj;
102 return first.equals(create.getFirst()) && second.equals(create.getSecond());
103 }
104 return false;
105 }
106
107
108 //创建静态内部类:CreateWritable.Comparator
109 public static class Comparator extends WritableComparator{
110
111 public Comparator() {
112 super(CreateWritable.class);
113 // TODO Auto-generated constructor stub
114 }
115 private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
116
117 @Override
118 public int compare(byte[] b1, int s1, int l1, byte[] b2,
119 int s2, int l2) {
120 // TODO Auto-generated method stub
121 int firstL1 = 0,firstL2 = 0;
122 try {
123 firstL1 = WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1, s1);
124 firstL2 = WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2, s2);
125 int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
126 if(cmp !=0){
127 return cmp;
128 }
129 } catch (IOException e) {
130 // TODO Auto-generated catch block
131 e.printStackTrace();
132 }
133 return TEXT_COMPARATOR.compare(b1, s1+firstL1, l1-firstL1, b2, s2+firstL2, l2-firstL2);
134 }
135
136 }
137 static{
138 WritableComparator.define(CreateWritable.class,new Comparator());
139 }
140 }
Writable07.java
1 package cn.roboson.writable;
2
3 import java.io.ByteArrayOutputStream;
4 import java.io.DataOutputStream;
5 import java.io.IOException;
6
7 import org.apache.hadoop.io.RawComparator;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.io.Writable;
10 import org.apache.hadoop.io.WritableComparator;
11
12 public class Writable07 {
13
14 public static void main(String[] args) throws IOException {
15
16 CreateWritable create01 = new CreateWritable(new Text("Hadoop"), new Text("1"));
17 CreateWritable create02 = new CreateWritable(new Text("Hadoop"), new Text("2"));
18 byte[] b1 = serialize(create01);
19 byte[] b2 = serialize(create02);
20 RawComparator comparator = WritableComparator.get(CreateWritable.class);
21 int cmp =comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
22 if(cmp ==0){
23 System.out.println("create01 == create02");
24 }else if(cmp ==-1){
25 System.out.println("create01create02");
28 }
29 }
30
31 public static byte[] serialize(Writable writable) throws IOException{
32 ByteArrayOutputStream out = new ByteArrayOutputStream();
33 DataOutputStream dataOut = new DataOutputStream(out);
34 writable.write(dataOut);
35 return out.toByteArray();
36
37 }
38 }
运行结果:
分析:
在上面的CreateWritable的内部类Comparator中,需要实现一个方法compare()方法,那么compare该如何实现,才能够对序列化数据流进行比较。
CreateWritable是由两个Text对象组成的(Text first, Text second),而Text对象的二进制表示,是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传递给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。
也就是说Text对象序列化后,也是由两部分组成,一部分是记录本身有多少个字节,另一部分就是它自己的字节数!
记录它本身有多少个字节所占用的字节长度:WritableUtils.decodeVintSize()方法返回一个整数,代表它的字节长度。
它自己本身的字节数:WritableComparator的readVInt()方法返回一个整数,代表它本身的字节长度。
这样就方便了,相比叫first的序列化数据流,根据结果,判断,看是否需要再比较second的序列化数据流。
|
|