日期:2014-05-17 浏览次数:20950 次
package org.apache.hadoop.io;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public final class WritableUtils {
/**
* 压缩数据流 -> 解压后bytes
* 数据流向: DataInput -> buffer (byte[]) -> ByteArrayInputStream -> GZIPInputStream
* -> outbuf (byte[], decompressed) -> ByteOutputStream (memory)
* -> memoryToBytesArray
* 因为解压后的bytes大小未知,因此利用了ByteOutputStream自带的缓冲区来保存解压后的bytes
*/
public static byte[] readCompressedByteArray(DataInput in) throws IOException {
int length = in.readInt();
if (length == -1) return null;
byte[] buffer = new byte[length];
in.readFully(buffer); // could/should use readFully(buffer,0,length)?
GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
byte[] outbuf = new byte[length];
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int len;
while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
bos.write(outbuf, 0, len);
}
byte[] decompressed = bos.toByteArray();
bos.close();
gzi.close();
return decompressed;
}
public static void skipCompressedByteArray(DataInput in) throws IOException {
int length = in.readInt();
if (length != -1) {
skipFully(in, length);
}
}
/**
* bytes -> 压缩后输出到DataOutput
* 数据流向: bytes -> GZIPOutputStream -> ByteArrayOutputStream (memory) -> buffer (memoryToBytesArray)
* -> DataOutput (先写入压缩字节数,再写入buffer)
*/
public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
if (bytes != null) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzout = new GZIPOutputStream(bos);
gzout.write(bytes, 0, bytes.length);
gzout.close();
byte[] buffer = bos.toByteArray();
int len = buffer.length;
out.writeInt(len);
out.write(buffer, 0, len);
/* debug only! Once we have confidence, can lose this. */
return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
} else {
out.writeInt(-1);
return -1;
}
}
/*
* 直接将从DataInput in里的输入数据流解压缩后,以UTF-8形式解析到String中
*/
/* Ugly utility, maybe someone else can do this better */
public static String readCompressedString(DataInput in) throws IOException {
byte[] bytes = readCompressedByteArray(in);
if (bytes == null) return null;
return new String(bytes, "UTF-8");
}
/*
* 先将String s以UTF-8形式变成bytes,然后压缩写入DataOutput
*/
public static int writeCompressedString(DataOutput out, String s) throws IOException {
return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
}
/*
*
* Write a String as a Network Int n, followed by n Bytes
* Alternative to 16 bit read/writeUTF.
* Encoding standard is... ?
*
*/
public static void writeString(DataOutput out, String s) throws IOException {
if (s != null) {
byte[] buffer = s.getBytes("UTF-8");
int len = buffer.length;
out.writeInt(len);
out.write(buffer, 0, len);
} else {
out.writeInt(-1);
}
}
/*
* Read a String as a Network Int n, followed by n Bytes
* Alternative to 16 bit read/writeUTF.
* Encoding standard is... ?
*
*/
public static String readString(DataInput in) throws IOException{
int length = in.readInt();
if (length == -1) return null;
byte[] buffer = new byte[length];
in.readFully(buffer); // could/should use readFully(buffer,0,length)?
return new String(buffer,"UTF-8");
}
/*
* Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
* Could be generalised using introspection.
*
*/
public static vo