设为首页 收藏本站
查看: 487|回复: 0

[经验分享] org.apache.hadoop.io.WritableUtils简单分析

[复制链接]

尚未签到

发表于 2016-12-11 08:19:00 | 显示全部楼层 |阅读模式
  



package org.apache.hadoop.io;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public final class WritableUtils  {

/**
* 压缩数据流 -> 解压后bytes
* 数据流向: DataInput -> buffer (byte[]) -> ByteArrayInputStream -> GZIPInputStream
*                    -> outbuf (byte[], decompressed) -> ByteOutputStream (memory)
*                    -> memoryToBytesArray
* 因为解压后的bytes大小未知,因此利用了ByteOutputStream自带的缓冲区来保存解压后的bytes
*/
public static byte[] readCompressedByteArray(DataInput in) throws IOException {
int length = in.readInt();
if (length == -1) return null;
byte[] buffer = new byte[length];
in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
byte[] outbuf = new byte[length];
ByteArrayOutputStream bos =  new ByteArrayOutputStream();
int len;
while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
bos.write(outbuf, 0, len);
}
byte[] decompressed =  bos.toByteArray();
bos.close();
gzi.close();
return decompressed;
}
public static void skipCompressedByteArray(DataInput in) throws IOException {
int length = in.readInt();
if (length != -1) {
skipFully(in, length);
}
}
/**
* bytes -> 压缩后输出到DataOutput
* 数据流向: bytes -> GZIPOutputStream -> ByteArrayOutputStream (memory) -> buffer (memoryToBytesArray)
*                -> DataOutput (先写入压缩字节数,再写入buffer)
*/
public static int  writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
if (bytes != null) {
ByteArrayOutputStream bos =  new ByteArrayOutputStream();
GZIPOutputStream gzout = new GZIPOutputStream(bos);
gzout.write(bytes, 0, bytes.length);
gzout.close();
byte[] buffer = bos.toByteArray();
int len = buffer.length;
out.writeInt(len);
out.write(buffer, 0, len);
/* debug only! Once we have confidence, can lose this. */
return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
} else {
out.writeInt(-1);
return -1;
}
}

/*
* 直接将从DataInput in里的输入数据流解压缩后,以UTF-8形式解析到String中
*/
/* Ugly utility, maybe someone else can do this better  */
public static String readCompressedString(DataInput in) throws IOException {
byte[] bytes = readCompressedByteArray(in);
if (bytes == null) return null;
return new String(bytes, "UTF-8");
}
/*
* 先将String s以UTF-8形式变成bytes,然后压缩写入DataOutput
*/
public static int  writeCompressedString(DataOutput out, String s) throws IOException {
return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
}
/*
*
* Write a String as a Network Int n, followed by n Bytes
* Alternative to 16 bit read/writeUTF.
* Encoding standard is... ?
*
*/
public static void writeString(DataOutput out, String s) throws IOException {
if (s != null) {
byte[] buffer = s.getBytes("UTF-8");
int len = buffer.length;
out.writeInt(len);
out.write(buffer, 0, len);
} else {
out.writeInt(-1);
}
}
/*
* Read a String as a Network Int n, followed by n Bytes
* Alternative to 16 bit read/writeUTF.
* Encoding standard is... ?
*
*/
public static String readString(DataInput in) throws IOException{
int length = in.readInt();
if (length == -1) return null;
byte[] buffer = new byte[length];
in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
return new String(buffer,"UTF-8");  
}

/*
* Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
* Could be generalised using introspection.
*
*/
public static void writeStringArray(DataOutput out, String[] s) throws IOException{
out.writeInt(s.length);
for(int i = 0; i < s.length; i++) {
writeString(out, s);
}
}
/*
* Write a String array as a Nework Int N, followed by Int N Byte Array of
* compressed Strings. Handles also null arrays and null values.
* Could be generalised using introspection.
*
*/
public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
if (s == null) {
out.writeInt(-1);
return;
}
out.writeInt(s.length);
for(int i = 0; i < s.length; i++) {
writeCompressedString(out, s);
}
}
/*
* Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
* Could be generalised using introspection. Actually this bit couldn't...
*
*/
public static String[] readStringArray(DataInput in) throws IOException {
int len = in.readInt();
if (len == -1) return null;
String[] s = new String[len];
for(int i = 0; i < len; i++) {
s = readString(in);
}
return s;
}

/*
* Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
* Could be generalised using introspection. Handles null arrays and null values.
*
*/
public static  String[] readCompressedStringArray(DataInput in) throws IOException {
int len = in.readInt();
if (len == -1) return null;
String[] s = new String[len];
for(int i = 0; i < len; i++) {
s = readCompressedString(in);
}
return s;
}

/*
*
* Test Utility Method Display Byte Array.
*
*/
public static void displayByteArray(byte[] record){
int i;
for(i=0;i < record.length -1; i++){
if (i % 16 == 0) { System.out.println(); }
System.out.print(Integer.toHexString(record  >> 4 & 0x0F));
System.out.print(Integer.toHexString(record & 0x0F));
System.out.print(",");
}
System.out.print(Integer.toHexString(record  >> 4 & 0x0F));
System.out.print(Integer.toHexString(record & 0x0F));
System.out.println();
}
/**
* Make a copy of a writable object using serialization to a buffer.
* @param orig The object to copy
* @return The copied object
*/
public static <T extends Writable> T clone(T orig, Configuration conf) {
try {
@SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T>
T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf);
ReflectionUtils.copy(conf, orig, newInst);
return newInst;
} catch (IOException e) {
throw new RuntimeException("Error writing/reading clone buffer", e);
}
}
/**
* Make a copy of the writable object using serialiation to a buffer
* @param dst the object to copy from
* @param src the object to copy into, which is destroyed
* @throws IOException
* @deprecated use ReflectionUtils.cloneInto instead.
*/
@Deprecated
public static void cloneInto(Writable dst, Writable src) throws IOException {
ReflectionUtils.cloneWritableInto(dst, src);
}
/**
* Serializes an integer to a binary stream with zero-compressed encoding.
* For -120 <= i <= 127, only one byte is used with the actual value.
* For other values of i, the first byte value indicates whether the
* integer is positive or negative, and the number of bytes that follow.
* If the first byte value v is between -121 and -124, the following integer
* is positive, with number of bytes that follow are -(v+120).
* If the first byte value v is between -125 and -128, the following integer
* is negative, with number of bytes that follow are -(v+124). Bytes are
* stored in the high-non-zero-byte-first order.
*
* @param stream Binary output stream
* @param i Integer to be serialized
* @throws java.io.IOException
*/
public static void writeVInt(DataOutput stream, int i) throws IOException {
writeVLong(stream, i);
}
/**
* Serializes a long to a binary stream with zero-compressed encoding.
* For -112 <= i <= 127, only one byte is used with the actual value.
* For other values of i, the first byte value indicates whether the
* long is positive or negative, and the number of bytes that follow.
* If the first byte value v is between -113 and -120, the following long
* is positive, with number of bytes that follow are -(v+112).
* If the first byte value v is between -121 and -128, the following long
* is negative, with number of bytes that follow are -(v+120). Bytes are
* stored in the high-non-zero-byte-first order.
*
* @param stream Binary output stream
* @param i Long to be serialized
* @throws java.io.IOException
*/
/*
* 将一个long类型的i,写入输出流DataOutput中
* 如果 -112 <= i <= 127,只使用一个byte表示i并写入输出流中
* 第一个字节表示i的正负和接下来表示i的字节数
* 如果第一个字节-113 <= v <= -120,那么i是正数,并且接下来i占的字节数是-(v+112)(也就是1到8个字节之间)
* 如果第一个字节-121 <= v <= -128,那么i是负数,并且接下来的i占的字节数是-(v+120)(也就是1到8个字节之间)
* 写入时先写i的高位,再写低位
*
*/
public static void writeVLong(DataOutput stream, long i) throws IOException {
if (i >= -112 && i <= 127) {
stream.writeByte((byte)i);
return;
}
int len = -112;
if (i < 0) {
i ^= -1L; // take one's complement'
len = -120;
}
long tmp = i;
while (tmp != 0) {
tmp = tmp >> 8;
len--;
}
stream.writeByte((byte)len);
len = (len < -120) ? -(len + 120) : -(len + 112);
for (int idx = len; idx != 0; idx--) {
int shiftbits = (idx - 1) * 8;
long mask = 0xFFL << shiftbits;
stream.writeByte((byte)((i & mask) >> shiftbits));
}
}

/**
* Reads a zero-compressed encoded long from input stream and returns it.
* @param stream Binary input stream
* @throws java.io.IOException
* @return deserialized long from stream.
*/
public static long readVLong(DataInput stream) throws IOException {
byte firstByte = stream.readByte();
int len = decodeVIntSize(firstByte);
if (len == 1) {
return firstByte;
}
long i = 0;
for (int idx = 0; idx < len-1; idx++) {
byte b = stream.readByte();
i = i << 8;
i = i | (b & 0xFF);
}
return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
/**
* Reads a zero-compressed encoded integer from input stream and returns it.
* @param stream Binary input stream
* @throws java.io.IOException
* @return deserialized integer from stream.
*/
public static int readVInt(DataInput stream) throws IOException {
return (int) readVLong(stream);
}
/**
* Given the first byte of a vint/vlong, determine the sign
* @param value the first byte
* @return is the value negative
*/
public static boolean isNegativeVInt(byte value) {
return value < -120 || (value >= -112 && value < 0);
}
/**
* Parse the first byte of a vint/vlong to determine the number of bytes
* @param value the first byte of the vint/vlong
* @return the total number of bytes (1 to 9)
*/
public static int decodeVIntSize(byte value) {
if (value >= -112) {
return 1;
} else if (value < -120) {
return -119 - value;
}
return -111 - value;
}
/**
* Get the encoded length if an integer is stored in a variable-length format
* @return the encoded length
*/
public static int getVIntSize(long i) {
if (i >= -112 && i <= 127) {
return 1;
}
if (i < 0) {
i ^= -1L; // take one's complement'
}
// find the number of bytes with non-leading zeros
int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
// find the number of data bytes + length byte
return (dataBits + 7) / 8 + 1;
}
/**
* Read an Enum value from DataInput, Enums are read and written
* using String values.
* @param <T> Enum type
* @param in DataInput to read from
* @param enumType Class type of Enum
* @return Enum represented by String read from DataInput
* @throws IOException
*/
public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType)
throws IOException{
return T.valueOf(enumType, Text.readString(in));
}
/**
* writes String value of enum to DataOutput.
* @param out Dataoutput stream
* @param enumVal enum value
* @throws IOException
*/
public static void writeEnum(DataOutput out,  Enum<?> enumVal)
throws IOException{
Text.writeString(out, enumVal.name());
}
/**
* Skip <i>len</i> number of bytes in input stream<i>in</i>
* @param in input stream
* @param len number of bytes to skip
* @throws IOException when skipped less number of bytes
*/
public static void skipFully(DataInput in, int len) throws IOException {
int total = 0;
int cur = 0;
while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) {
total += cur;
}
if (total<len) {
throw new IOException("Not able to skip " + len + " bytes, possibly " +
"due to end of input.");
}
}
/** Convert writables to a byte array */
public static byte[] toByteArray(Writable... writables) {
final DataOutputBuffer out = new DataOutputBuffer();
try {
for(Writable w : writables) {
w.write(out);
}
out.close();
} catch (IOException e) {
throw new RuntimeException("Fail to convert writables to a byte array",e);
}
return out.getData();
}
}

 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312531-1-1.html 上篇帖子: hadoop分析之三org.apache.hadoop.hdfs.server.namenode各个类的功能与角色 下篇帖子: 通过eclipse项目编译 hadoop 1.0.3 eclipse 4.2 ( juno ) plugin
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表