设为首页 收藏本站
查看: 4367|回复: 0

[经验分享] Hadoop中Writable类之四

[复制链接]

尚未签到

发表于 2015-7-12 10:09:46 | 显示全部楼层 |阅读模式
  1.定制Writable类型
  Hadoop中有一套Writable实现,例如:IntWritable、Text等,但是,有时候可能并不能满足自己的需求,这个时候,就需要自己定制Writable类型。
  定制分以下几步:


  • 需要实现WritableComparable接口,因为Writable常常作为健值对出现,而在MapReduce中,中间有个排序很重要,因此,Hadoop中就让Writable实现了WritableComparable
  • 需要实现WritableComparable的write()、readFields()、compareTo()方法
  • 需要重写java.lang.Object中的hashCode()、equals()、toString()方法。
  由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法
  如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法
  
  Example:
  CreateWritable.java



  1 package cn.roboson.writable;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.io.WritableComparable;
  9 /**
10  * 1.定制一个Writable类型,里面包含两个Text
11  * 2.和IntWritable等原有的Writable类似,它需要实现WritableComparable
12  * 3.由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法
13  * 4.如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法
14  * @author roboson
15  *
16  */
17
18 public class CreateWritable implements WritableComparable{
19     
20     private Text first;
21     private Text second;
22     
23     //构造方法,这个是必须要的
24     public CreateWritable(){
25         
26     }
27     
28     public CreateWritable(String first,String second){
29         set(new Text(first), new Text(second));
30     }
31     
32     public CreateWritable(Text first,Text second){
33         set(first,second);
34     }
35     
36     
37     
38     public Text getFirst() {
39         return first;
40     }
41
42     public void setFirst(Text first) {
43         this.first = first;
44     }
45
46     public Text getSecond() {
47         return second;
48     }
49
50     public void setSecond(Text second) {
51         this.second = second;
52     }
53
54     public void set(Text first,Text second){
55         this.first=first;
56         this.second=second;
57     }
58     
59
60     @Override
61     public void readFields(DataInput in) throws IOException {
62         // TODO Auto-generated method stub
63         first.readFields(in);
64         second.readFields(in);
65     }
66
67     @Override
68     public void write(DataOutput out) throws IOException {
69         // TODO Auto-generated method stub
70         first.write(out);
71         second.write(out);
72     }
73
74     @Override
75     public int compareTo(CreateWritable other) {
76         // TODO Auto-generated method stub
77         int cmp=first.compareTo(other.getFirst());
78         if(cmp!=0){
79             return cmp;
80         }
81         return second.compareTo(other.getSecond());
82     }
83     
84     @Override
85     public int hashCode() {
86         // TODO Auto-generated method stub
87         return first.hashCode()*163 + second.hashCode();
88     }
89     
90     @Override
91     public String toString() {
92         // TODO Auto-generated method stub
93         return first+"\t"+second;
94     }
95     
96     @Override
97     public boolean equals(Object obj) {
98         // TODO Auto-generated method stub
99         if(obj instanceof CreateWritable){
100             CreateWritable create = (CreateWritable) obj;
101             return first.equals(create.getFirst()) && second.equals(create.getSecond());
102         }
103         return false;
104     }
105
106 }
  
  Writable06.java



1 package cn.roboson.writable;
2
3 import org.apache.hadoop.io.Text;
4
5 public class Writable06 {
6     
7     public static void main(String[] args) {
8         CreateWritable createWritable01 = new CreateWritable("Hadoop", "roboson");
9         CreateWritable createWritable02 = new CreateWritable(new Text("Hadoop"), new Text("roboson"));
10         
11         //重写了equals()方法,相比叫first 和 second两者都相同的时候,返回true
12         System.out.println(createWritable01.equals(createWritable02));
13         
14         //重写了compareTo()方法,先比较first,再比较second,返回0:相等 -1:firstsecond
15         System.out.println(createWritable01.compareTo(createWritable02));
16         
17         //重写了toString()方法,返回 first +"\t"+second
18         System.out.println(createWritable01.toString());
19     }
20 }
  
  运行结果:

   DSC0000.png
  
  
  2.为速度实现一个RawComparator
  关于Comparator方面的知识,可以参考我的博文《Hadoop中WritableComparable 和 comparator》,Comparator中,有一个方法是compare(byte[] b1,int s1,int l1,byte[] b2,int s2, int l2),该方法直接比较的是序列化,不必先将序列化数据流转化为对象,再进行比较,所以,与上面的compareTo()方法,相比,其更高效!其实,我们查看HadoopAPI,就可以发现,基本类型的Writable类都实现有了Comparator作为其内部类:
DSC0001.png
  好,再看看IntWritable.Comparator这个类,如下图所示,发现它是一个静态类,好好观察它的结构:
DSC0002.png
  
  通过上面的,我们可以发现,要为一个Writable实现一个Comparator,安装Hadoop的格式来,需要注意以下几点:


  • 将其在内部实现,作为内部类
  • 是一个静态的内部类
  • 注册,以便可以通过WritableComparator直接创建
  至于注册方面的知识,可以查看我的博客《Hadoop中Comparator原理》
  Example:给上面自定义的CreateWritable类实现一个Comparator,也就是CreateWritable.Comparator类
  CreateWritable.java



  1 package cn.roboson.writable;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.util.Comparator;
  7
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.io.WritableComparable;
10 import org.apache.hadoop.io.WritableComparator;
11 import org.apache.hadoop.io.WritableUtils;
12 /**
13  * 1.给定制的CreateWritable类实现一个Comparator,CreateWritable.Comparator
14  * @author roboson
15  *
16  */
17
18 public class CreateWritable implements WritableComparable{
19     
20     private Text first;
21     private Text second;
22     
23     //构造方法,这个是必须要的
24     public CreateWritable(){
25         
26     }
27     
28     public CreateWritable(String first,String second){
29         set(new Text(first), new Text(second));
30     }
31     
32     public CreateWritable(Text first,Text second){
33         set(first,second);
34     }
35     
36     
37     
38     public Text getFirst() {
39         return first;
40     }
41
42     public void setFirst(Text first) {
43         this.first = first;
44     }
45
46     public Text getSecond() {
47         return second;
48     }
49
50     public void setSecond(Text second) {
51         this.second = second;
52     }
53
54     public void set(Text first,Text second){
55         this.first=first;
56         this.second=second;
57     }
58     
59
60     @Override
61     public void readFields(DataInput in) throws IOException {
62         // TODO Auto-generated method stub
63         first.readFields(in);
64         second.readFields(in);
65     }
66
67     @Override
68     public void write(DataOutput out) throws IOException {
69         // TODO Auto-generated method stub
70         System.out.println(first);
71         first.write(out);
72         second.write(out);
73     }
74
75     @Override
76     public int compareTo(CreateWritable other) {
77         // TODO Auto-generated method stub
78         int cmp=first.compareTo(other.getFirst());
79         if(cmp!=0){
80             return cmp;
81         }
82         return second.compareTo(other.getSecond());
83     }
84     
85     @Override
86     public int hashCode() {
87         // TODO Auto-generated method stub
88         return first.hashCode()*163 + second.hashCode();
89     }
90     
91     @Override
92     public String toString() {
93         // TODO Auto-generated method stub
94         return first+"\t"+second;
95     }
96     
97     @Override
98     public boolean equals(Object obj) {
99         // TODO Auto-generated method stub
100         if(obj instanceof CreateWritable){
101             CreateWritable create = (CreateWritable) obj;
102             return first.equals(create.getFirst()) && second.equals(create.getSecond());
103         }
104         return false;
105     }
106     
107     
108     //创建静态内部类:CreateWritable.Comparator
109     public static class Comparator extends WritableComparator{
110
111         public Comparator() {
112             super(CreateWritable.class);
113             // TODO Auto-generated constructor stub
114         }
115         private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
116         
117         @Override
118         public int compare(byte[] b1, int s1, int l1, byte[] b2,
119                 int s2, int l2) {
120             // TODO Auto-generated method stub
121             int firstL1 = 0,firstL2 = 0;
122             try {
123                 firstL1 = WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1, s1);
124                 firstL2 = WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2, s2);
125                 int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
126                 if(cmp !=0){
127                     return cmp;
128                 }
129             } catch (IOException e) {
130                 // TODO Auto-generated catch block
131                 e.printStackTrace();
132             }
133             return TEXT_COMPARATOR.compare(b1, s1+firstL1, l1-firstL1, b2, s2+firstL2, l2-firstL2);
134         }
135         
136     }
137     static{
138         WritableComparator.define(CreateWritable.class,new Comparator());
139     }
140 }
  
  Writable07.java



1 package cn.roboson.writable;
2
3 import java.io.ByteArrayOutputStream;
4 import java.io.DataOutputStream;
5 import java.io.IOException;
6
7 import org.apache.hadoop.io.RawComparator;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.io.Writable;
10 import org.apache.hadoop.io.WritableComparator;
11
12 public class Writable07 {
13     
14     public static void main(String[] args) throws IOException {
15         
16         CreateWritable create01 = new CreateWritable(new Text("Hadoop"), new Text("1"));
17         CreateWritable create02 = new CreateWritable(new Text("Hadoop"), new Text("2"));
18         byte[] b1 = serialize(create01);
19         byte[] b2 = serialize(create02);
20         RawComparator comparator = WritableComparator.get(CreateWritable.class);
21         int cmp =comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
22         if(cmp ==0){
23             System.out.println("create01 == create02");
24         }else if(cmp ==-1){
25             System.out.println("create01create02");
28         }
29     }
30     
31     public static byte[] serialize(Writable writable) throws IOException{
32         ByteArrayOutputStream out = new ByteArrayOutputStream();
33         DataOutputStream dataOut = new DataOutputStream(out);
34         writable.write(dataOut);
35         return out.toByteArray();
36         
37     }
38 }
  
  运行结果:
   DSC0003.png
  分析:
  在上面的CreateWritable的内部类Comparator中,需要实现一个方法compare()方法,那么compare该如何实现,才能够对序列化数据流进行比较。
  CreateWritable是由两个Text对象组成的(Text first, Text second),而Text对象的二进制表示,是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传递给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。
  也就是说Text对象序列化后,也是由两部分组成,一部分是记录本身有多少个字节,另一部分就是它自己的字节数!
  记录它本身有多少个字节所占用的字节长度:WritableUtils.decodeVintSize()方法返回一个整数,代表它的字节长度。
  它自己本身的字节数:WritableComparator的readVInt()方法返回一个整数,代表它本身的字节长度。
  这样就方便了,相比叫first的序列化数据流,根据结果,判断,看是否需要再比较second的序列化数据流。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85708-1-1.html 上篇帖子: Hadoop 2.2.0 Job源代码阅读笔记 下篇帖子: Hadoop伪分布模式安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表