|
package zhouls.bigdata.myMapReduce.weather;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public>
// 1949-10-01 14:21:02 34c WeatherMapper
// 1949-10-02 14:01:02 36c
// 1950-01-01 11:21:02 32c 分区在MyPartitioner.java
// 1950-10-01 12:21:02 37c
// 1951-12-01 12:21:02 23c 排序在MySort.java
// 1950-10-02 12:21:02 41c
// 1950-10-03 12:21:02 27c 分组在MyGroup.java
// 1951-07-01 12:21:02 45c
// 1951-07-02 12:21:02 46c 再,WeatherReducer
// 1951-07-03 12:21:03 47c
//key:每行第一个隔开符(制表符)左边为key,右边为value 自定义类型MyKey,洗牌,
static>
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
NullWritable v =NullWritable.get();
// 1949-10-01 14:21:02是自定义类型MyKey,即key
// 34c是DoubleWritable,即value
protected void map(Text key, Text value,Context context) throws IOException, InterruptedException {
try {
Date date =sdf.parse(key.toString());
Calendar c =Calendar.getInstance();
//Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象,
//此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance();
c.setTime(date);
int year =c.get(Calendar.YEAR);
int month =c.get(Calendar.MONTH);
double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
MyKey k =new MyKey();
k.setYear(year);
k.setMonth(month);
k.setHot(hot);
context.write(k, new DoubleWritable(hot));
} catch (Exception e) {
e.printStackTrace();
}
}
}
static>
protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,Context arg2)throws IOException, InterruptedException {
int i=0;
for(DoubleWritable v :arg1){
i++;
String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();//"\t"是制表符
arg2.write(new Text(msg), NullWritable.get());
if(i==3){
break;
}
}
}
}
public static void main(String[] args) {
Configuration config =new Configuration();
// config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
// config.set("yarn.resourcemanager.hostname", "HadoopMaster");
// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如","
try {
FileSystem fs =FileSystem.get(config);
Job job =Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("weather");
job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setPartitionerClass(MyPartitioner.class);
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroup.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(KeyValueTextInputFormat.class);
// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt
//
// Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather");
FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt
Path outpath =new Path("./out/weather");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f= job.waitForCompletion(true);
if(f){
}
} catch (Exception e) {
e.printStackTrace();
}
}
} |
|