|
这篇博客,给大家,体会不一样的版本编程。
代码
1 package zhouls.bigdata.myMapReduce.wordcount4;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.io.IntWritable;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapreduce.Mapper;
9 import org.apache.hadoop.util.StringUtils;
10
11 public>
12
13 //该方法循环调用,从文件的split中读取每行调用一次,把该行所在的下标为key,该行的内容为value
14 protected void map(LongWritable key, Text value,
15 Context context)
16 throws IOException, InterruptedException {
17 String[] words = StringUtils.split(value.toString(), ' ');
18 for(String w :words){
19 context.write(new Text(w), new IntWritable(1));
20 }
21 }
22 }
1 package zhouls.bigdata.myMapReduce.wordcount4;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.io.IntWritable;
6 import org.apache.hadoop.io.Text;
7 import org.apache.hadoop.mapreduce.Reducer;
8
9 public>
10
11 //每组调用一次,这一组数据特点:key相同,value可能有多个。
12 protected void reduce(Text arg0, Iterable<IntWritable> arg1,
13 Context arg2)
14 throws IOException, InterruptedException {
15 int sum =0;
16 for(IntWritable i: arg1){
17 sum=sum+i.get();
18 }
19 arg2.write(arg0, new IntWritable(sum));
20 }
21 }
//System.setProperty("HADOOP_USER_NAME", "root");
//
//1、MR执行环境有两种:本地测试环境,服务器环境
//
//本地测试环境(windows):(便于调试)
//在windows的hadoop目录bin目录有一个winutils.exe
//1、在windows下配置hadoop的环境变量
//2、拷贝debug工具(winutils.exe)到HADOOP_HOME/bin
//3、修改hadoop的源码 ,注意:确保项目的lib需要真实安装的jdk的lib
//
//4、MR调用的代码需要改变:
//a、src不能有服务器的hadoop配置文件(因为,本地是调试,去服务器环境集群那边的)
//b、再调用是使用:
//Configuration config = new Configuration();
//config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
//config.set("yarn.resourcemanager.hostname", "HadoopMaster");
//服务器环境:(不便于调试),有两种方式。
//首先需要在src下放置服务器上的hadoop配置文件(都要这一步)
//1、在本地直接调用,执行过程在服务器上(真正企业运行环境)
//a、把MR程序打包(jar),直接放到本地
//b、修改hadoop的源码 ,注意:确保项目的lib需要真实安装的jdk的lib
//c、增加一个属性:
//config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
//d、本地执行main方法,servlet调用MR。
//
//
//2、直接在服务器上,使用命令的方式调用,执行过程也在服务器上
//a、把MR程序打包(jar),传送到服务器上
//b、通过: hadoop jar jar路径 类的全限定名
//
//
//
//
//a,1 b,1
//a,3 c,3
//a,2 d,2
//
//
//a,3 c,3
//a,2 d,2
//a,1 b,1
//
1 package zhouls.bigdata.myMapReduce.wordcount4;
2
3
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.fs.FileSystem;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.IntWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12
13 public>
14
15 public static void main(String[] args) {
16 Configuration config =new Configuration();
17 config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
18 config.set("yarn.resourcemanager.hostname", "HadoopMaster");
19 // config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");//先打包好wc.jar
20 try {
21 FileSystem fs =FileSystem.get(config);
22
23 Job job =Job.getInstance(config);
24 job.setJarByClass(RunJob.class);
25
26 job.setJobName("wc");
27
28 job.setMapperClass(WordCountMapper.class);
29 job.setReducerClass(WordCountReducer.class);
30
31 job.setMapOutputKeyClass(Text.class);
32 job.setMapOutputValueClass(IntWritable.class);
33
34 FileInputFormat.addInputPath(job, new Path("/usr/input/wc/wc.txt"));//新建好输入路径,且数据源
35
36 Path outpath =new Path("/usr/output/wc");
37 if(fs.exists(outpath)){
38 fs.delete(outpath, true);
39 }
40 FileOutputFormat.setOutputPath(job, outpath);
41
42 boolean f= job.waitForCompletion(true);
43 if(f){
44 System.out.println("job任务执行成功");
45 }
46 } catch (Exception e) {
47 e.printStackTrace();
48 }
49 }
50 }
|
|
|