设为首页 收藏本站
查看: 929|回复: 0

[经验分享] Hadoop学习笔记(一):MapReduce的输入格式

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-13 08:34:42 | 显示全部楼层 |阅读模式
  Hadoop学习有一段时间了,但是缺乏练手的项目,老是学了又忘。想想该整理一个学习笔记啥的,这年头打字比写字方便。果断开博客,咩哈哈~~
  开场白结束(木有文艺细胞)
   默认的MapReduce作业



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapRunner;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MinimalMapReducewithDefaults extends Configured implements Tool {
@Override
public int run(String[] arg0) throws Exception {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), arg0);
if (conf == null) {
return -1;
}
conf.setInputFormat(TextInputFormat.class);
conf.setNumMapTasks(1);
conf.setMapperClass(IdentityMapper.class);
conf.setMapRunnerClass(MapRunner.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setPartitionerClass(HashPartitioner.class);
conf.setReducerClass(IdentityReducer.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
JobClient.runJob(conf);
return 0;
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MinimalMapReducewithDefaults(), args);
System.exit(exitCode);
//hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/test00001
    }
public static class JobBuilder {
public static JobConf parseInputAndOutput(Tool tool,
Configuration conf, String[] args) {
if (args.length != 2) {
printUsage(tool, " ");
return null;
}
JobConf jobConf = new JobConf(conf, tool.getClass());
FileInputFormat.addInputPath(jobConf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
return jobConf;
}
public static void printUsage(Tool tool, String extraArgsUsage) {
System.err.printf("Usage: % [genericOptions] %s\n\n", tool
.getClass().getSimpleName(), extraArgsUsage);
}
}
}
  这些个默认设置都不需要显式的设置,但是需要知道默认设置的是啥,贴出来省的忘记了。要注意的是:
  1) conf.setInputFormat(TextInputFormat.class);  默认的输入格式;
  2) conf.setMapOutputKeyClass(LongWritable.class); conf.setMapOutputValueClass(Text.class); 默认的map输出格式,其实就是将原样输出而已;
  3) conf.setPartitionerClass(HashPartitioner.class); 默认的分区函数,没有特殊操作都会使用这个了。具体算法是:return (key.hashCode()&Integer.Max_VALUE)%numPartitions;
  4) conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class);还是原样输出的格式。
        输入分片
  一个输入分片(split)是指由单个map处理的输入块,每个map操作之处理一个输入分片。
  包含一个一字节为单位的长度和一组存储位置(即一组主机名)。
  inputsplit由inputformat创建。inputformat负责产生输入分片并将它们分割成记录。
  FileInputFormat类
  所有使用文件作为其数据源的inputformat实现的基类。
  提供两个功能:  1)定义哪些文件包含在一个作业的输入中;
  2)未输入文件生成分片的实现。
  输入分片大小: 由最小分片大小、最大分片大小、块大小决定。分片大小在[最小分片大小,最大分片大小]区间内,且取最接近块大小的值。
  FileInputFormat的子类:TextInputFormat(默认类型,键是LongWritable类型,值为Text类型,key为当前行在文件中的偏移量,value为当前行本身);
  KeyValueTextInputFormat(适合文件自带key,value的情况,只要指定分隔符即可,比较实用);
  NLineInputFormat(key为当前行在文件中的偏移量,value为当前行本身,和TextInputFormat不同的是这个类型会为每个mapper指定固定行数的输入分片,N为每个mapper收到的输入行数;
  mapred.line.input.format.linespermap属性控制N的值);
  SequenceFileInputFormat(使用sequencefile作为map的输入)。
   DBInputFormat类
  数据库输入,在map中使用jdbc操作数据库,由于多个map将并发操作,故最好用于加载小量的数据集。操作数据库一般使用sqoop。
  TextOutputFormat类
  把每条记录写为文本行,每个键值使用制表符分割,(当然也可以使用mapred.textoutputformat.separator属性改变默认的分隔符)与TextOutputFormat对应的输入格式是keyValueTextInputFormat。可以用NullWritable来省略输出的键或者值(或者两个都省略,即相当于
  NullOutputFormat)
  SequenceFileOutputFormat类
   即以SequenceFile的格式输出,如果输出需要作为后续MapReduce作业的输入,这是一种很好的输出格式。格式紧凑,容易被压缩。
  (昨天弄到太晚了,代码写完直接睡了,今天上。。。)   
  MultipleOutputFormat类
  这个类可以将数据写到多个文件中,比较实用。比如将输出的数据按一定的逻辑归类到不同文件中。(通常是按照输出的键或者值中的信息归类)
  这个类有两个实体子类:MultipleTextOutputFormat,属于TextOutputFormat的多版本文件;
  MultipleSequenceFileOutputFormat,属于SequenceFileOutputFormat的多版本文件。
  关键点:: MultipleOutputFormat类提供了一些子类覆盖来控制输出文件名的protected方法(generateFileNameForKeyValue()方法),这个方法的返回值将用来作为输出的文件名称
  这里提到一点,我使用的时候开始继承的是MultipleTextOutputFormat,但是怎么调试都无效,还是输出 part-00000文件。然后各种查资料,最后好像网上有人说hadoop0.20.0版本还是之前的版本中存在bug,必须继承MultipleOutputFormat类才有效。但是我装的是hadoop1.04,不过我用的仍然是老版本的API在写,不知道是否是这个原因,有大神看到的话帮忙解惑下,这里谢谢了!
  下面是我继承MultipleOutputFormat类实现reducer输出到多个文件的代码:
  



import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;

public class PartitionByStationUsingMultipleOutputormat extends Configured
implements Tool {
static class StationMapper extends MapReduceBase implements
Mapper {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
parser.parse(value);
output.collect(new Text(parser.getYear()), value);
}
}
static class StationReducer extends MapReduceBase implements
Reducer {
@Override
public void reduce(Text key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
output.collect(values.next(), NullWritable.get());
}
}
}
static class StationNameMultipleTextOutputFormat extends
MultipleOutputFormat {
private NcdcRecordParser parser = new NcdcRecordParser();
private OutputFormat theTextOutputFormat = null;  
protected String generateFileNameForKeyValue(Text key,
NullWritable value, String name) {
parser.parse(key);
return parser.getStationId() + "/" + parser.getYear();
//return name + "-" + parser.getStationId();
        }
@Override
protected RecordWriter getBaseRecordWriter(
FileSystem arg0, JobConf arg1, String arg2, Progressable arg3)
throws IOException {
// TODO Auto-generated method stub
if(theTextOutputFormat==null){
theTextOutputFormat= new TextOutputFormat();
}
return theTextOutputFormat.getRecordWriter(arg0, arg1, arg2, arg3);
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setMapperClass(StationMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setReducerClass(StationReducer.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputFormat(StationNameMultipleTextOutputFormat.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new PartitionByStationUsingMultipleOutputormat(), args);
System.exit(exitCode);
//args
//hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/test1234567890
    }
}
  
  这里面引用了两个帮助类:JobBuilder,NcdcRecordParser。之后的笔记中也会使用到,就一并在这里贴出来了。
  JobBuilder:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
public class JobBuilder {
public static JobConf parseInputAndOutput(Tool tool,
Configuration conf, String[] args) {
if (args.length != 2) {
printUsage(tool, " ");
return null;
}
JobConf jobConf = new JobConf(conf, tool.getClass());
FileInputFormat.addInputPath(jobConf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
return jobConf;
}
public static void printUsage(Tool tool, String extraArgsUsage) {
System.err.printf("Usage: % [genericOptions] %s\n\n", tool
.getClass().getSimpleName(), extraArgsUsage);
}
}
  
  NcdcRecordParser:



import org.apache.hadoop.io.Text;
public class NcdcRecordParser {
private static final int MISSING_TEMPERATURE = 9999;
private String year;
private String stationId;
private int airTemperature;
boolean isMalformed;
public void parse(String record) {
year = record.substring(5, 9);
stationId = record.substring(0, 2);
String airTemperatureString;
// Remove leading plus sign as parseInt doesn't like them
airTemperatureString = record.substring(15, 19);
try {
airTemperature = Integer.parseInt(airTemperatureString);
isMalformed = false;
} catch (NumberFormatException e) {
isMalformed = true;
}
}
public void parse(Text record) {
parse(record.toString());
}
public boolean isValidTemperature() {
return airTemperature != MISSING_TEMPERATURE && !isMalformed;
}
public boolean isMissingTemperature() {
return airTemperature == MISSING_TEMPERATURE;
}
public boolean isMalformedTemperature() {
return isMalformed;
}
public String getYear() {
return year;
}
public String getStationId() {
return stationId;
}
public int getAirTemperature() {
return airTemperature;
}
}
  
  
  
  
      输出文件截个图吧:
DSC0000.jpg
  
     截图中看到的是以getStationId()命名的子文件夹,在这些文件夹中是以getYear()命名的文件。
  MultipleOutputs类
  这个类用于在原有输出基础上附加输出,输出是指定名称的,可以写到一个文件或者多个文件中。
  使用这个类的静态方法addMultiNamedOutput来设置输出名称,(其实是输出文件名的前缀了,后面都会加上其他数据)
  需要注意的一点是,在这个静态方法中指定的名称,必须在Reducer中的MultipleOutputs类的实例方法 getCollector中接收,即作为这个方法的第一个参数传入,不对应的话我试了下会报错。这个报错是因为啥呢?(没找到,或者我还没理解到,以后补充吧,同时希望看到帖子的大神帮忙解惑,这里谢谢了!)
  
  
  下面是实现的代码,同样使用到了上面贴出来的帮助类:



import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
public class PartitionByStationUsingMultipleOutputs extends Configured
implements Tool {
static class StationMapper extends MapReduceBase implements Mapper{
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
parser.parse(value);
output.collect(new Text(parser.getStationId()), value);
}
}
static class MultipleOutputsReducer extends MapReduceBase implements Reducer{
private MultipleOutputs multipleOutputs;
@Override
public void configure(JobConf conf){
multipleOutputs = new MultipleOutputs(conf);
}
@Override
public void reduce(Text key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
OutputCollector collector = multipleOutputs.getCollector("station", key.toString(),reporter);
while(values.hasNext()){
collector.collect(NullWritable.get(), values.next());
}
}
@Override
public void close() throws IOException{
multipleOutputs.close();
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setMapperClass(StationMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setReducerClass(MultipleOutputsReducer.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputFormat(TextOutputFormat.class);
MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new PartitionByStationUsingMultipleOutputs(), args);
System.exit(exitCode);
//args
//hdfs://192.168.174.128:9000/user/root/input/Temperature.txt hdfs://192.168.174.128:9000/user/root/output/testMultipleOutputs123456789
    }
}
  
   下面是运行结果截图:
DSC0001.jpg
  
  截图中看到的是以"station"+getStationid()+partNum命名的文件。
  上面主要介绍了MapReduce作业使用到的一些常用的输入格式,输出格式。(都是书本上的理论知识了,只能拿书本上的例子来练手,哎 苦于无项目实战,自己买机器配集群太不现实了,而且也搞不到实际需求和实际数据。悲催~~)
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85951-1-1.html 上篇帖子: hadoop的simple认证 下篇帖子: hadoop mapreduce 优化
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表