|
1 package zhouls.bigdata.myMapReduce.TVPlayCount;
2
3 import java.io.IOException;
4
5 import javax.swing.JComboBox.KeySelectionManager;
6
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.conf.Configured;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.util.Tool;
20 import org.apache.hadoop.util.ToolRunner;
21
22 /**
23 * @input params 各类网站每天每部电影的点播量、收藏量、 评论数 、反票数和支持数等数据的统计
24 * @ouput params 分别输出每个网站 每部电视剧总的统计数据相关情况
25 * @author zhouls
26 * @function 自定义FileInputFormat 将电视剧的统计数据 根据不同网站以MultipleOutputs 输出到不同的文件夹下
27 */
28 public>
29 /**
30 * @input Params Text TvPlayData
31 * @output Params Text TvPlayData
32 * @author zhouls
33 * @function 直接输出
34 */
35 public static>
36 //k1,v1 k2,v2 k1是Text型,v1是TVPlayData型,k2是Text型,v2是TVPlayData型
37
38 // 因为,v1我们想要的是, 5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数。这是hadoop自带类型无法满足到的,所以,我们得自定义,即TVPlayData,专门来写。
39 // 里面的数据类型,五个指标放到一块输出的,那么这是什么类型?是不是hadoop默认的,无法满足,所以我们得自定义数据类型。
40
41 @Override
42 protected void map(Text key, TVPlayData value, Context context)throws IOException, InterruptedException{
43 // k1,默认情况是行偏移量,当然也可以自定义key,即不是行偏移量,如这里。
44
45
46 // key=Text
47 // Key是继承者们 1
48
49 // value=TVPlayData
50 // value是zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@7b992254
51 // value是zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@719349c5,注意,这个每次不一样
52
53
54 // key=Text
55 // 继承者们精华版 4
56
57 // value=TVPlayData
58 // zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@7b992254
59
60 context.write(key,value);//写入key是k2,value是v2
61 // 直接TVPlayData那边自定义好了
62
63 // context.write(new Text(key),new TVPlayData(value));//等价
64
65 }
66 }
67
68
69
70 /**
71 * @input Params Text TvPlayData
72 * @output Params Text Text
73 * @author zhouls
74 * @fuction 统计每部电视剧的 点播数 收藏数等 按source输出到不同文件夹下
75 */
76 public static>
77 private Text m_key = new Text();
78 private Text m_value = new Text();
79
80 private MultipleOutputs<Text, Text> mos;//MultipleOutputs<output Key , output Value >是hadoop里的多目录输出或者说将结果输出到多个文件或目录下
81 // MultipleOutputs,这是MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。
82
83 // Hadoop每一个Reducer产生一个输出文件,文件以part-r-00000,part-r-00001的方式进行命名,
84 // 如果需要人为的控制输出文件的命名或者每一个Reducer需要写出多个输出文件时,可以采用MultipleOutputs类来完成,
85 // MultipleOutputs采用输出记录的键值对(output Key和output Value)或者任意字符串来生成输出文件的名字,
86 // 文件一般以name-r-nnnnn的格式进行命名,其中name是程序设计的任意名字;nnnnn表示分区号。
87
88
89 protected void setup(Context context) throws IOException,InterruptedException{
90 mos = new MultipleOutputs<Text, Text>(context);//强制转换,将context的值,转换为MultipleOutputs<k3 , v3 >类型
91 }
92
93 protected void reduce(Text Key, Iterable<TVPlayData> Values,Context context) throws IOException, InterruptedException{
94 int daynumber = 0;
95 int collectnumber = 0;
96 int commentnumber = 0;
97 int againstnumber = 0;
98 int supportnumber = 0;
99
100
101 //迭代器
102 for (TVPlayData tv : Values) {//星型for循环,即把Values的值一一传给TVPlayData tv
103 daynumber += tv.getDaynumber();//拿值
104 collectnumber += tv.getCollectnumber();
105 commentnumber += tv.getCommentnumber();
106 againstnumber += tv.getAgainstnumber();
107 supportnumber += tv.getSupportnumber();
108 }
109
110 //24小时第一季 1 258962 124 48 2 10
111 //tvname source 播放量 收藏数 评论数 踩数 赞数
112 //1代表优酷,2代表搜狐,3代表土豆,4代表爱奇艺,5代表迅雷看看
113 String[] records = Key.toString().split("\t");//转换成String数组类型
114
115 String source = records[1];// 媒体类别
116
117 m_key.set(records[0]);//赋值,将records[0]的值,给,m_key
118
119
120 m_value.set(daynumber + "\t" + collectnumber + "\t" + commentnumber
121 + "\t" + againstnumber + "\t" + supportnumber);
122 //赋值,将daynumber + "\t" + collectnumber + "\t" + commentnumber+ "\t" + againstnumber + "\t" + supportnumber的值,给m_value
123
124
125 if (source.equals("1")) {//判别1代表优酷,2代表搜狐,3代表土豆,4代表爱奇艺,5代表迅雷看看
126 mos.write("youku", m_key, m_value);//写入,将"youku", m_key, m_value,写入到mos
127 } else if (source.equals("2")) {
128 mos.write("souhu", m_key, m_value);//写入
129 } else if (source.equals("3")) {
130 mos.write("tudou", m_key, m_value);//写入
131 } else if (source.equals("4")) {
132 mos.write("aiqiyi", m_key, m_value);//写入
133 } else if (source.equals("5")) {
134 mos.write("xunlei", m_key, m_value);//写入
135 // m_key=Text
136 // m_key是龟岩许浚
137 //
138 // m_value=Text
139 // m_value是740014 0 0 44 84
140 }
141 }
142
143
144 protected void cleanup(Context context) throws IOException,InterruptedException{
145 // context=.WrappedReducer$Context
146 // context是org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context@4c566d9b
147
148
149 mos.close();//写完后,要关闭
150 }
151
152
153 }
154
155
156 public int run(String[] args) throws Exception {
157 Configuration conf = new Configuration();// 配置文件对象
158 Path mypath = new Path(args[1]);
159 FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
160 if (hdfs.isDirectory(mypath)) {
161 hdfs.delete(mypath, true);
162 }
163
164 Job job = new Job(conf, "tvplay");// 构造任务
165 job.setJarByClass(TVPlayCount.class);// 设置主类
166
167 job.setMapperClass(TVPlayMapper.class);// 设置Mapper
168 job.setMapOutputKeyClass(Text.class);// key输出类型
169 job.setMapOutputValueClass(TVPlayData.class);// value输出类型
170 job.setInputFormatClass(TVPlayInputFormat.class);//自定义输入格式
171
172 job.setReducerClass(TVPlayReducer.class);// 设置Reducer
173 job.setOutputKeyClass(Text.class);// reduce key类型
174 job.setOutputValueClass(Text.class);// reduce value类型
175 // 自定义文件输出格式
176 MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class,
177 Text.class, Text.class);
178 MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class,
179 Text.class, Text.class);
180 MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class,
181 Text.class, Text.class);
182 MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class,
183 Text.class, Text.class);
184 MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class,
185 Text.class, Text.class);
186
187 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
188 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
189 job.waitForCompletion(true);
190 return 0;
191 }
192 public static void main(String[] args) throws Exception {
193
194 //集群模式下
195 String[] args0 = { "hdfs://HadoopMaster:9000/inputData/fiveTvWebsiteDlay/tvplay.txt",
196 "hdfs://HadoopMaster:9000/outData/fiveTvWebsiteDlay/" };
197
198 //本地模式下
199 // String[] args0 = { "./data/tvplay/tvplay.txt",
200 // "./out/tvplay" };
201
202
203 int ec = ToolRunner.run(new Configuration(), new TVPlayCount(), args0);
204 System.exit(ec);
205 }
206 }
207 |
|