设为首页 收藏本站
查看: 680|回复: 0

[经验分享] hadoop 自定义inputformat和outputformat

[复制链接]

尚未签到

发表于 2016-12-9 09:39:31 | 显示全部楼层 |阅读模式
  hadoop的inputformat和outputformat
  最好的例子vertica :虽然是在pig中实现的udf,但是就是hadoop的inputformat和outputformat,在hive里也可以照用,贴个下载的地址:http://blackproof.iyunv.com/blog/1791995
  再贴一个项目中,在实现hadoop join时,用的inputformat和outputformat的简单实例:
  hadoop join在http://blackproof.iyunv.com/blog/1757530
  自定义inputformat(泛型是maper的input)

public class MyInputFormat extends FileInputFormat<MultiKey,Employee> {
public MyInputFormat(){}
@Override
public RecordReader<MultiKey, Employee> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return new MyRecordReader();
}
public static class MyRecordReader extends RecordReader<MultiKey, Employee>{
public LineReader in;
public MultiKey key;
public Employee value;
public StringTokenizer token = null;
public Text line;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit fileSplit = (FileSplit)split;
Configuration job = context.getConfiguration();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(job);
FSDataInputStream filein = fs.open(file);
in = new LineReader(filein, job);
key = new MultiKey();
value = new Employee();
line = new Text();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
int linesize = in.readLine(line);
if(linesize==0)
return false;
String[] pieces = line.toString().split(",");
int i = Integer.valueOf(pieces[0]);
switch (i) {
case 1:
value.setEmpName(pieces[1]);
value.setFlag(1);
break;
default:
value.setDepartName(pieces[1]);
value.setFlag(2);
break;
}
value.setDepartId(pieces[2]);
value.setDepartNo(pieces[3]);
key.setDepartId(value.getDepartId());
key.setDepartNo(value.getDepartNo());
return true;
}
@Override
public MultiKey getCurrentKey() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Employee getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}
}
  自定义outputformat(泛型是reduce的输出)

public class MyOutputFormat extends FileOutputFormat<Text, Employee> {
@Override
public RecordWriter<Text, Employee> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = job.getConfiguration();
Path file = getDefaultWorkFile(job, "");
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new MyRecordWriter(fileOut);
}
public static class MyRecordWriter extends RecordWriter<Text, Employee>{
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public static final String NEW_LINE = System.getProperty("line.separator");
public MyRecordWriter(DataOutputStream out){
this(out,":");
}
public MyRecordWriter(DataOutputStream out,String keyValueSeparator){
this.out = out;
this.keyValueSeparator = keyValueSeparator.getBytes();
}
@Override
public void write(Text key, Employee value) throws IOException,
InterruptedException {
if(key!=null){
out.write(key.toString().getBytes());
out.write(keyValueSeparator);
}
out.write(value.toString().getBytes());
out.write(NEW_LINE.getBytes());
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
out.close();
}
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311791-1-1.html 上篇帖子: 自行实现Hadoop的多属性WritableComparable 下篇帖子: 深入理解Hadoop集群和网络 .
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表