设为首页 收藏本站
查看: 1240|回复: 0

[经验分享] Hadoop Writable深度复制及读取任意序列文件

[复制链接]

尚未签到

发表于 2016-12-13 06:45:55 | 显示全部楼层 |阅读模式
  上次留了一个问题如何实现Writable的深度复制,上网找了下,还真有这个类,叫做WritableDeepCopier,可以在http://mvnrepository.com/artifact/org.apache.crunch/crunch/0.5.0-incubating进行下载;下载导入,然后编程调用,但是如何调用?网上找了很多,但是都没有例子,哎,还是自己摸索吧,结果搞了一点时间还是不行,调用出错。然后就去看源码,它的deepCopy方法可以直接借鉴即可

public T More ...deepCopy(T source) {
50    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
51    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
52    T copiedValue = null;
53    try {
54      source.write(dataOut);
55      dataOut.flush();
56      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
57      DataInput dataInput = new DataInputStream(byteInStream);
58      copiedValue = writableClass.newInstance();
59      copiedValue.readFields(dataInput);
60    } catch (Exception e) {
61      throw new CrunchRuntimeException("Error while deep copying " + source, e);
62    }
63    return copiedValue;
64  }上面代码可以看出只要传入classWritable变量即可使用这个方法了,所以编写了下面的测试代码:
package mahout.fansy.utils.read;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.types.writable.WritableDeepCopier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class ReadArbiKV {
/**
* 读取任意<key,value>序列文件
*/
public static Configuration conf=new Configuration();
public static WritableDeepCopier<Writable> wdc;
static String fPath="";
static String trainPath="";
static{
conf.set("mapred.job.tracker", "ubuntu:9001");
fPath="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout/labelindex"; //  数据文件
}
public static void main(String[] args) throws IOException {
readFromFile(fPath);
//readFromFile(trainPath);
}
/**
* 读取序列文件
* @param fPath
* @return
* @throws IOException
*/
public static Map<Writable,Writable> readFromFile(String fPath) throws IOException{
FileSystem fs = FileSystem.get(URI.create(fPath), conf);
Path path = new Path(fPath);
Map<Writable,Writable> map=new HashMap<Writable,Writable>();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
@SuppressWarnings("unchecked")
Class<Writable> writableClassK=(Class<Writable>) reader.getKeyClass();
@SuppressWarnings("unchecked")
Class<Writable> writableClassV=(Class<Writable>) reader.getValueClass();
while (reader.next(key, value)) {
// Writable k=;  // 如何实现Writable的深度复制?
Writable k=deepCopy(key, writableClassK); // Writable 的深度复制
Writable v=deepCopy(value,writableClassV);
map.put(k, v);
//  System.out.println(key.toString()+", "+value.toString());
//  System.exit(-1);// 只打印第一条记录
}
} finally {
IOUtils.closeStream(reader);
}
return map;
}
/**
* Writable 的深度复制
* 引自WritableDeepCopier
* @param fPath
* @return
* @throws IOException
*/
public static Writable deepCopy(Writable source,Class<Writable> writableClass) {
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(byteOutStream);
Writable copiedValue = null;
try {
source.write(dataOut);
dataOut.flush();
ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
DataInput dataInput = new DataInputStream(byteInStream);
copiedValue = writableClass.newInstance();
copiedValue.readFields(dataInput);
} catch (Exception e) {
throw new CrunchRuntimeException("Error while deep copying " + source, e);
}
return copiedValue;
}
}



上面的代码初步测试ok,这个可以把任意的<key,value>的序列文件(虽说是任意,但是key和value还有实现Writable接口才行)进行读取,并且读取到一个Map类中。  

  分享,成长,快乐
  转载请注明blog地址:http://blog.csdn.net/fansy1990
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313343-1-1.html 上篇帖子: 原创抢鲜教程:快用Cloudera SCM Express管理你的hadoop集群吧 下篇帖子: hadoop学习路线之:ant简介及其使用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表