设为首页 收藏本站
查看: 1219|回复: 0

[经验分享] 怎么在hadoop作map/reduce时输出N种不同类型的value

[复制链接]

尚未签到

发表于 2016-12-12 09:38:21 | 显示全部楼层 |阅读模式
  BTW:再次感叹下没有机器, 3.4G的语料,单机处理了10来个小时, 真是郁闷~~ 要是有N台机器多好啊.
  在很多时候,特别是处理大数据的时候,我们希望一道MapReduce过程就可以解决几个问题。这样可以避免再次读取数据。比如:在做文本聚类/分类的时候,mapper读取语料,进行分词后,要同时算出每个词条(term)的term frequency以及它的document frequency. 前者对于每个词条来说其实是个向量, 它代表此词条在N篇文档各中的词频;而后者就是一个非负整数。 这时候就可以借助一种特殊的Writable类:GenericWritable.
  用法是:继承这个类,然后把你要输出value的Writable类型加进它的CLASSES静态变量里,在后面的TermMapper和TermReducer中我的value使用了三种ArrayWritable,IntWritable和我自已定义的TFWritable,所以要把三者全加入TermWritable的CLASSES中。

package redpoll.examples;
import org.apache.hadoop.io.GenericWritable;
import org.apache.hadoop.io.Writable;
/**
* Generic Writable class for terms.
* @author Jeremy Chow(coderplay@gmail.com)
*/
public class TermWritable extends GenericWritable {
private static Class<? extends Writable>[] CLASSES = null;
static {
CLASSES = (Class<? extends Writable>[]) new Class[] {
org.apache.hadoop.io.ArrayWritable.class,
org.apache.hadoop.io.IntWritable.class,
redpoll.examples.TFWritable.class
};
}
public TermWritable() {
}
public TermWritable(Writable instance) {
set(instance);
}
@Override
protected Class<? extends Writable>[] getTypes() {
return CLASSES;
}
}

  Mapper在collect数据时,用刚才定义的TermWritable来包装(wrap)要使用的Writable类。

package redpoll.examples;
import java.io.IOException;
import java.io.StringReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
/**
* A class provides for doing words segmenation and counting term TFs and DFs.<p>
* in: key is document id, value is a document instance. <br>
* output:
* <li>key is term, value is a <documentId, tf> pair</li>
* <li>key is term, value is document frequency corresponsing to the key</li>
* @author Jeremy Chow(coderplay@gmail.com)
*/
public class TermMapper extends MapReduceBase implements
Mapper<LongWritable, Document, Text, TermWritable> {
private static final Log log = LogFactory.getLog(TermMapper.class
.getName());
/* analyzer for words segmentation */
private Analyzer analyzer = null;
/* frequency weight for document title */
private IntWritable titleWeight = new IntWritable(2);
/* frequency weight for document content */
private IntWritable contentWeight = new IntWritable(1);
public void map(LongWritable key, Document value,
OutputCollector<Text, TermWritable> output, Reporter reporter)
throws IOException {
doMap(key, value.getTitle(), titleWeight, output, reporter);
doMap(key, value.getContent(), contentWeight, output, reporter);
}
private void doMap(LongWritable key, String value, IntWritable weight,
OutputCollector<Text, TermWritable> output, Reporter reporter)
throws IOException {
// do words segmentation
TokenStream ts = analyzer.tokenStream("dummy", new StringReader(value));
Token token = new Token();
while ((token = ts.next(token)) != null) {
String termString = new String(token.termBuffer(), 0, token.termLength());
Text term = new Text(termString);
// <term, <documentId,tf>>
TFWritable tf = new TFWritable(key, weight);
output.collect(term, new TermWritable(tf)); // wrap then collect
// <term, weight>
output.collect(term, new TermWritable(weight)); // wrap then collect
}
}
@Override
public void configure(JobConf job) {
String analyzerName = job.get("redpoll.text.analyzer");
try {
if (analyzerName != null)
analyzer = (Analyzer) Class.forName(analyzerName).newInstance();
} catch (Exception excp) {
excp.printStackTrace();
}
if (analyzer == null)
analyzer = new StandardAnalyzer();
}
}
   Reduce如果想获取数据,则可以解包(unwrap)它:

package redpoll.examples;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* Form a tf vector and caculate the df for terms.
* @author Jeremy Chow(coderplay@gmail.com)
*/
public class TermReducer extends MapReduceBase implements Reducer<Text, TermWritable, Text, Writable> {
private static final Log log = LogFactory.getLog(TermReducer.class.getName());
public void reduce(Text key, Iterator<TermWritable> values,
OutputCollector<Text, Writable> output, Reporter reporter)
throws IOException {
ArrayList<TFWritable> tfs = new ArrayList<TFWritable>();
int sum = 0;
//    log.info("term:" + key.toString());
while (values.hasNext()) {
Writable value = values.next().get(); // unwrap
if (value  instanceof TFWritable) {
tfs.add((TFWritable) value );
}else {
sum += ((IntWritable) value).get();
}
}
TFWritable writables[] = new TFWritable[tfs.size()];
ArrayWritable aw = new ArrayWritable(TFWritable.class, tfs.toArray(writables));
// wrap again
output.collect(key, new TermWritable(aw));
output.collect(key, new TermWritable(new IntWritable(sum)));
}
}
  这儿collect的时候可以不再用TermWritable,只不过我在重新定义了OutputFormat,让它输出到两个不同的文件,而且输出的类型也是不一样的。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313082-1-1.html 上篇帖子: Hadoop 2.2.0 (YARN)搭建笔记(作者的工作很细致,赞一个!) 下篇帖子: Hadoop MapReduce 学习笔记(十一) MapReduce实现类似SQL的order by/排序3 改进
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表