单位有一组业务一直都是使用Streaming压缩文本日志,大体上就是设置作业输出为BZ2格式,怎么输入就怎么输出,没有任何处理功能在里面。但是每行结尾都多出来一个TAB。终于,有一个业务需要使用TAB前的最后一个字段,不去掉不行了。 虽然是个小问题,但是网上搜了一圈,也没有很好的解决。很多人都遇到了,但是单位的业务比较特殊,只有map没有reduce。http://stackoverflow.com/questio ... unwanted-delimiters这个上面直接说“As I discussed with friends, there's no easy way to achieve the goal,...”。
Streaming有个特点,默认是按照TAB去区分Key和Value。如果没有设置Key字段的数目,默认一行里面第一个TAB之前的做Key,后面的是Value。如果没有找到Tab,就全都是Key字段,Value是空。之所以后面会多出个Tab,正是Key和Value之间的那个Tab。
首先是考察Streaming的Map,在PipeMapper.java。InputWriter处理输出,所以尝试实现自定义输出。在MapReduce作业配置里面,stream.map.input.writer.class负责指定InputWriter是哪一个,默认是TextInputWriter。Streaming在这里比较坑,增加-Dstream.map.input.writer.class=XXX的选项并不能令Streaming使用自定义的实现类,必须实现自己的IdentifierResolver,然后在其中对不同类型的输入设定不同类型的InputWriter,而其中的输入类型,必须由stream.map.input选项传入。是否设置成功以作业运行时候JobTracker的配置参数表为准。
不巧的是,使用自定义的InputWriter代替TextInputWriter,行尾的Tab是没了,行首又多了个数字。估计是Hadoop给Mapper传入的Key被打印出来了。oooorz....不能瞎猜了,还是看看代码吧。
好在代码蛮短的还是。
Streaming会把本身、以及用户-file -cacheFile -cacheArchive 等选项指定的文件,打成一个Jar包提交到集群进行MR作业。把集群的输出,作为用户实现Mapper的输入;读取用户实现Mapper的输出,作为整个Map作业的输出。Input/Output相对于用户自定义作业,Writer/Reader体现为Streaming的行为,因此是InputWriter和OutputReader。简单来讲,
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
后记:
A. 网上很多是想办法修改分隔符,把TAB换成空字符。这是一个非常粗暴的做法,基本上就是埋坑!为什么呢?
日志文本内容可以是很丰富的,这次出问题是因为每行没有TAB。如果换做含有TAB的文本,把分隔符变为空串,就把日志中原有的TAB去掉了。
B. 之所以这么搞,也是受到了stackoverflow的这个Q&A的启发。http://stackoverflow.com/questio ... from-reducer-output。类似的,Q&A也是采用修改分隔符的办法,是不可取的。但是仔细发现,是可以在自己重写的TextOutputFormat<K,V>里,修改LineRecordWriter.write方法的。
重写TextOutputFormat是十分优雅的解决,看似修改了Hadoop本身的东西,但是在Streaming最新版没有加入这个fix之前,防止对每个版本的Streaming都要变更、重新编译打包。另外,Streaming不是独立的项目,编译它需要同时编译Hadoop!
用vim写Java打包确实略蛋疼,周一上班试试这个更加优雅的办法。
C. 虽然是修改了Streaming代码,但是不需要考虑会影响同一机器所有用户的问题,也不用修改$HADOOP_HOME下的Streaming包。streaming提供了这个参数stream.shipped.hadoopstreaming。
D. 有些设置似乎是指对Reducer生效,对于这种只有Mapper的作业不起作用。比如
mapred.textoutputformat.ignoreseparatormapred.textoutputformat.separator 设置了,没看到什么效果。
再有就是,命令行选项里面如果写-DXXX= \ 这样的语句,似乎也没有把这个参数设置为空串的效果,写-DXXX= ""也是一样。