设为首页 收藏本站
查看: 848|回复: 0

[经验分享] 最近项目进展及Hadoop自定义InputFormat

[复制链接]

尚未签到

发表于 2016-12-11 11:02:20 | 显示全部楼层 |阅读模式
  题外话: 请关注http://code.google.com/p/redpoll
  如果有人可以提供10台左右普通机器测就好了,学校实验室不给这么多,俺已经写了一篇paper, 程序啥都好了,就差数据, 真郁闷。
  进展比较慢, 走了些弯路, 不过最终还是理清了。开始考虑文档聚类后,要能通过文档的id定位到该文档的内容。而且这个id最好是整型,或者long型,而搜狐新闻给的docno是占32字节的GUID。如果不考虑这点, hadoop在算词频, 势必会生成很庞大的中间文件。
  term1    docId1:tf, docId2:tf, ....
  term2    docId1:tf, docId2:tf, ....
  ...
  为了图个简便,考虑用数据库, select content from table where documentid = x 就可以直接实现这功能。然而MySql与hadoop的结合尚在初步, 没有作sharding。这样Mapper在读取数据时,就不是分布式的了。捣鼓了几天hbase, 写了些代码, 发现只存了17万条数据后,就再也存不下去了,原因不明。而且 我发现这个bigtable的仿制品还只是刚刚起步,有很多不稳定性。
  没办法,看来只能靠自己了。定位其实很简单, 只要知道这篇文档在文件中的偏移量,然后用这个偏移量seek就可以了。在java中,一个long型占8字节,而且把hadoop的hdfs一般操作64m以上的文件比较有利。 把3.4G+的搜狗语料(http://www.sogou.com/labs/dl/cs.html) 全部cat在一起,然后用偏移量做文档ID是比较合理的。要定义mapper接受的key-value不是<LongWritable, Text>对的话,那得自定义InputFormat。 我针对搜狗语料做了一个SogouInputFormat,然后还有对应的RecordReader, Writable实现。结果,学校网络有问题,googlecode的svn commit不了。
  搜狗的语料是采用类xml形式存储文本文件。因为处理大规模数据要求速度快,用DOM不现实。开始尝试用sax解析,结果有问题。因为有些格式出错。 于是我花了两个晚上,手写了两个状态机用来解析,终于可以读取正确,而且速度比较快。单机读取语料的速度平均51m/s,也就是说单机读取3.4G的搜狗语料一分钟多一点就可以完成。而且这种作法可以跑在mapreduce模型上了。
  接下来,就是处理分词, tf, df及计算tf-idf得出VSM。 
  贴些代码片段:

package redpoll.examples;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
/**
* Input format for sogou corpus.
* @author Jeremy Chow(coderplay@gmail.com)
*/
public class SogouInputFormat extends FileInputFormat<LongWritable, DocumentWritable>
implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
return compressionCodecs.getCodec(file) == null;
}
public RecordReader<LongWritable, DocumentWritable> getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new SogouRecordReader(job, (FileSplit) genericSplit);
}
}

 
package redpoll.examples;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
/**
* A class that provides a sogou document reader from an input stream.
* @author Jeremy Chow(coderplay@gmail.com)
*/
public class SogouCorpusReader {
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
/* input stream which we will get documents from */
private InputStream in;
/* a buffer stores temporary bytes readed from input stream */
private byte[] buffer;
/* the number of bytes of real data in the buffer */
private int bufferLength = 0;
/* the current position in the buffer */
private int bufferPosn = 0;
/* the buffer position in input stream */
private long bufferCurrentPosn = 0;
  private long currentDocPosn = 0;
/* xml-like mark tags used in sogou corpus */
private byte[] docTag;
private byte[] urlTag;
private byte[] docnoTag;
private byte[] titleTag;
private byte[] contentTag;
/* parser status */
enum STATUS {
PREPARE, START_ELEMENT, END_ELEMENT, TEXT
};
/* used for defining current parsing node */
enum NODE {
NULL, DOC, URL, DOC_NO, TITLE, CONTENT, FAILED, SUCCEED
};
private STATUS currentSatus;
private NODE currentNode;
public SogouCorpusReader(InputStream in) throws IOException {
this(in, DEFAULT_BUFFER_SIZE);
}
public SogouCorpusReader(InputStream in, int bufferSize) throws IOException {
this(in, bufferSize, "doc", "url", "docno", "contenttitle", "content");
}
public SogouCorpusReader(InputStream in, int bufferSize, String doc,
String url, String docno, String title, String content)
throws IOException {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
docTag = doc.getBytes("UTF-8");
urlTag = url.getBytes("UTF-8");
docnoTag = docno.getBytes("UTF-8");
titleTag = title.getBytes("UTF-8");
contentTag = content.getBytes("UTF-8");
}
public SogouCorpusReader(InputStream in, Configuration conf)
throws IOException {
this(in, conf.getInt("redpoll.sogou.doc.buffersize", DEFAULT_BUFFER_SIZE),
conf.get("redpoll.sogou.doc", "doc"),
conf.get("redpoll.sogou.doc.url","url"),
conf.get("redpoll.sogou.doc.docno", "docno"),
conf.get("redpoll.sogou.doc.contenttitle", "contenttitle"),
conf.get("redpoll.sogou.doc.content", "content"));
}
/**
* Gets a {@link redpoll.examples.Document} instance from sogou text file. If it reached EOF, return null.
* @param  a {@link redpoll.examples.Document} instance getting from sogou text file.
* @return the position of this document, -1 if it reached EOF.
* @throws IOException
*/
public long nextDoc(SogouDocument doc) throws IOException {
currentSatus = STATUS.PREPARE;
currentNode = NODE.NULL;
try {
while (currentNode != NODE.SUCCEED) {
adjustBuffer();
if (currentSatus == STATUS.PREPARE) {
if (buffer[bufferPosn] == '<')
currentSatus = STATUS.START_ELEMENT;
} else if (currentSatus == STATUS.START_ELEMENT) {
if (buffer[bufferPosn] == '/') { // e.g. </node>
currentSatus = STATUS.END_ELEMENT;
} else {
int start = bufferPosn; byte[] name = null;
while (buffer[bufferPosn] != '>' && buffer[bufferPosn] != '\n') {
bufferPosn++;
if(bufferPosn >= bufferLength) {
name = new byte[bufferLength - start];
System.arraycopy(buffer, start, name, 0, bufferLength - start);
start = 0;
}
adjustBuffer();
}
// if a element ends with '\n', we consider it as a wrong element
if (buffer[bufferPosn] == '\n')
failed(); // FAILED
else if (buffer[bufferPosn] == '>') {
int len = bufferPosn - start;
if (len > 0) {
if (name != null) {
byte[] newname = new byte[name.length + len];
System.arraycopy(name, 0, newname, 0, name.length);
System.arraycopy(buffer, start, newname, name.length, len);
name = newname;
} else {
name = new byte[len];
System.arraycopy(buffer, start, name, 0, len);
}
startElement(name);
}
ignoreWhite();
currentSatus = STATUS.TEXT;
}
}
} else if (currentSatus == STATUS.TEXT) {
int start = bufferPosn; byte[] text = null;
while (buffer[bufferPosn] != '<' && buffer[bufferPosn] != '\n') {
bufferPosn++;
if(bufferPosn >= bufferLength) {
// FIXME: if the content of a document passes through more than two buffers, it will get wrong!
text = new byte[bufferLength - start];
System.arraycopy(buffer, start, text, 0, bufferLength - start);
start = 0;
}
adjustBuffer();
}
if (buffer[bufferPosn] == '<') {
int len = bufferPosn - start;
if (len > 0) {
if (text != null) {
byte[] newtext = new byte[text.length + len];
System.arraycopy(text, 0, newtext, 0, text.length);
System.arraycopy(buffer, start, newtext, text.length, len);
text = newtext;
} else {
text = new byte[len];
System.arraycopy(buffer, start, text, 0, len);
}
characters(text, doc);
}
currentSatus = STATUS.START_ELEMENT;
} else if (buffer[bufferPosn] == '\n')
failed(); // FAILED
} else if (currentSatus == STATUS.END_ELEMENT) {
int start = bufferPosn; byte[] name = null;
while (buffer[bufferPosn] != '>' && buffer[bufferPosn] != '\n') {
bufferPosn++;
if(bufferPosn >= bufferLength) {
name = new byte[bufferLength - start];
System.arraycopy(buffer, start, name, 0, bufferLength - start);
start = 0;
}
adjustBuffer();
}
if (buffer[bufferPosn] == '>') {
int len = bufferPosn - start;
if (len > 0) {
if (name != null) {
byte[] newname = new byte[name.length + len];
System.arraycopy(name, 0, newname, 0, name.length);
System.arraycopy(buffer, start, newname, name.length, len);
name = newname;
} else {
name = new byte[len];
System.arraycopy(buffer, start, name, 0, len);
}
endElement(name);
}
ignoreWhite();
currentSatus = STATUS.PREPARE;
} else if (buffer[bufferPosn] != '\n')
failed(); // FAILED
}
bufferPosn++;
}
} catch (EOFException eofe) {
return -1;
}
return currentDocPosn;
}
/**
* Close the underlying stream.
* @throws IOException
*/
public void close() throws IOException {
in.close();
}
private void ignoreWhite() throws IOException, EOFException {
do {
bufferPosn++;
adjustBuffer();
} while (buffer[bufferPosn] == '\n' || buffer[bufferPosn] == '\r'
|| buffer[bufferPosn] == '\t' || buffer[bufferPosn] == ' ');
bufferPosn--;
}
private void adjustBuffer() throws IOException, EOFException {
if (bufferPosn >= bufferLength) {
bufferCurrentPosn += bufferLength;
bufferPosn = 0;
bufferLength = in.read(buffer);
if (bufferLength <= 0)
throw new EOFException();
}
}
private void startElement(byte[] name) {
if ((currentNode == NODE.NULL || currentNode == NODE.FAILED) && equals(docTag, name)) {
currentDocPosn = bufferCurrentPosn + bufferPosn - docTag.length - 1;
currentNode = NODE.DOC;
} else if (currentNode == NODE.DOC && equals(urlTag, name)) {
currentNode = NODE.URL;
} else if (currentNode == NODE.URL && equals(docnoTag, name)) {
currentNode = NODE.DOC_NO;
} else if (currentNode == NODE.DOC_NO && equals(titleTag, name)) {
currentNode = NODE.TITLE;
} else if (currentNode == NODE.TITLE && equals(contentTag, name)) {
currentNode = NODE.CONTENT;
} else {
currentNode = NODE.FAILED;
}
}
private void endElement(byte[] name) {
if (currentNode == NODE.CONTENT && equals(contentTag, name)) {
currentNode = NODE.SUCCEED;
}
}
private void characters(byte[] text, SogouDocument doc) {
if (currentNode == NODE.URL) {
doc.setPathBytes(text);
} else if (currentNode == NODE.DOC_NO) {
doc.setIdBytes(text);
} else if (currentNode == NODE.TITLE) {
doc.setTitleBytes(text);
} else if (currentNode == NODE.CONTENT) {
doc.setContentBytes(text);
}
}
private void failed() {
currentNode = NODE.FAILED;
}
private boolean equals(final byte [] left, final byte [] right) {
return left == null && right == null? true:
left == null && right != null? false:
left != null && right == null? false:
left.length != right.length? false:
WritableComparator.compareBytes(left, 0, left.length, right, 0, right.length) == 0;
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312732-1-1.html 上篇帖子: hive启动报错org.apache.hadoop.hive.ql.metadata.,org.apache.derby.jdbc.EmbeddedDriver 下篇帖子: hadoop 客户端(DfsClient)需要处理管道异常的情况.
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表