1、集群已搭建好且通过了WordCount测试,但是在eclipse上开发的程序却仍然是只在namenode上运行
不知道是不是没有配置好eclipse上的Map/Reduce Locations,个人感觉和那些mapred.XXX.XXX的配置选项有关(如果哪位大神知道怎么配置可以让在eclipse提交的作业在集群上的所有机器中运行起来,希望能留下联系方式或发去我邮箱dennias.chiu@gmail.com,互相交流以共同进步!)
不过!只要生成jar(直接用eclipse的export->runnable jar就可以了)后,在terminal(我用的是ubuntu12.10)上hadoop xxx.jar XXX就可以发现在集群中运行起来了,而且速度和伪分布式比真的是像MOTOGP和竞走的对比呀!
但是!要注意一点,如果程序中没有这一句:job.setJarByClass(xxxx.class)的话,在eclipse上是能够正确无误运行的!而在terminal上则会报出如:
INFO mapred.JobClient: Task Id : attempt_201305071019_0002_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: scut.chiuzzf.mapreduce.GetPageUrlJob$GetPageUrlMapper
的错误,这可能可以通过设置MANIFEST.MF文件可以解决,但由于项目时间有限,我没有花时间去试。(各位不妨一试,若是成功,希望也能分享交流)
——2013/5/7 2、集群运行过程中,以前一直可以运行良好的作业突然抛出 Java heap space 的错误如下所示
INFO mapred.JobClient: Task Id : attempt_201001061331_0002_m_000027_0, Status : FAILED
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.(MapTask.java:498)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:158)
一般来说,这个实际上是 Out Of Memory OOM问题。这有可能是程序逻辑的问题,在程序中申请的内存空间或者上一任务的内存空间没有释放,导致系统有运行时80%以上的时间用于gc。也有可能是分配内存太少导致的。
由于hadoop的mapreduce作业的运行机制是:在jobtracker接到客户端来的job提交后,将许多的task分配到集群中各个tasktracker上进行分块的计算,而根据代码中的逻辑可以看出,其实是在tasktracker上启了一个java进程进行运算,进程中有特定的端口和网络机制来保持map 和reduce之间的数据传输,所以,这些OOM的错误,其实就是这些java进程中报出了OOM的错误。 这种问题的解决办法是: 针对一,可以在程序的一些位置加上System.gc(并不是哪里加哪里释放,这只是提醒系统去释放内存空间,具体什么时候执行由JVM本身机制决定了)。
3、Dealing with lots of small files in Hadoop MapReduce with CombineFileInputFormat
Input to Hadoop MapReduce process is abstracted by InputFormat. FileInputFormat is a default implementation that deals with files in HDFS. With FileInputFormat, each file is splited into one or more InputSplits typically upper bounded by block size. This means the number of input splits are lower bounded by number of input files. This is not an ideal environment for MapReduce process when it's dealing with large number of small files, because overhead of coordinating distributed processes is far greater than when there are relatively small number of large files. Note here when the input split spills over block boundaries this could work against the general rule of 'having process close to data', because blocks could be at different network locations.
Enter CombineFileInputFormat, it packs many files into each split so
that each mapper has more to process. CombineFileInputFormat takes node
and rack locality into account when deciding which blocks to place in
the same split so it doesn't suffer from the same problem of simply
having a big split size.
public class MyCombineFileInputFormat extends CombineFileInputFormat {
public static class MyKeyValueLineRecordReader implements RecordReader {
private final KeyValueLineRecordReader delegate;
public MyKeyValueLineRecordReader(
CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
FileSplit fileSplit = new FileSplit(
split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
delegate = new KeyValueLineRecordReader(conf, fileSplit);
}
@Override
public boolean next(Text key, Text value) throws IOException {
return delegate.next(key, value);
}
@Override
public Text createKey() {
return delegate.createKey();
}
@Override
public Text createValue() {
return delegate.createValue();
}
@Override
public long getPos() throws IOException {
return delegate.getPos();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public float getProgress() throws IOException {
return delegate.getProgress();
}
}
@Override
public RecordReader getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new CombineFileRecordReader(
job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
}
}
CombineFileInputFormat is an abstract class that you need to extend and
override getRecordReader method. CombineFileRecordReader manages
multiple input splits in CombineFileSplit simply by constructing new
RecordReader for each input split within. MyKeyValueLineRecordReader
creates a KeyValueLineRecordReader to delegate operations to.
Remember to set mapred.max.split.size to a small multiple of block size in bytes as otherwise there will be no split at all.