设为首页 收藏本站
查看: 950|回复: 0

[经验分享] Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和多种输出格式分析(三十一)

[复制链接]

尚未签到

发表于 2017-12-18 15:01:40 | 显示全部楼层 |阅读模式
1 package zhouls.bigdata.myMapReduce.TVPlayCount;  

  2  
  3 import java.io.IOException;
  
  4
  
  5 import javax.swing.JComboBox.KeySelectionManager;
  
  6
  
  7 import org.apache.hadoop.conf.Configuration;
  
  8 import org.apache.hadoop.conf.Configured;
  
  9 import org.apache.hadoop.fs.FileSystem;
  
10 import org.apache.hadoop.fs.Path;
  
11 import org.apache.hadoop.io.Text;
  
12 import org.apache.hadoop.mapreduce.Job;
  
13 import org.apache.hadoop.mapreduce.Mapper;
  
14 import org.apache.hadoop.mapreduce.Reducer;
  
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
17 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  
19 import org.apache.hadoop.util.Tool;
  
20 import org.apache.hadoop.util.ToolRunner;
  
21
  
22 /**
  
23  * @input params 各类网站每天每部电影的点播量、收藏量、 评论数 、反票数和支持数等数据的统计
  
24  * @ouput params 分别输出每个网站 每部电视剧总的统计数据相关情况
  
25  * @author zhouls
  
26  * @function 自定义FileInputFormat 将电视剧的统计数据 根据不同网站以MultipleOutputs 输出到不同的文件夹下
  
27  */

  
28 public>  
29     /**
  
30      * @input Params Text TvPlayData
  
31      * @output Params Text TvPlayData
  
32      * @author zhouls
  
33      * @function 直接输出
  
34      */

  
35     public static>  
36         //k1,v1 k2,v2  k1是Text型,v1是TVPlayData型,k2是Text型,v2是TVPlayData型
  
37         
  
38 //        因为,v1我们想要的是, 5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数。这是hadoop自带类型无法满足到的,所以,我们得自定义,即TVPlayData,专门来写。
  
39 //        里面的数据类型,五个指标放到一块输出的,那么这是什么类型?是不是hadoop默认的,无法满足,所以我们得自定义数据类型。
  
40         
  
41         @Override
  
42         protected void map(Text key, TVPlayData value, Context context)throws IOException, InterruptedException{   
  
43 //            k1,默认情况是行偏移量,当然也可以自定义key,即不是行偏移量,如这里。
  
44
  
45            
  
46 //            key=Text
  
47 //            Key是继承者们    1
  
48            
  
49 //            value=TVPlayData
  
50 //            value是zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@7b992254
  
51 //            value是zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@719349c5,注意,这个每次不一样
  
52            
  
53            
  
54 //            key=Text
  
55 //            继承者们精华版    4
  
56            
  
57 //            value=TVPlayData
  
58 //            zhouls.bigdata.myMapReduce.TVPlayCount.TVPlayData@7b992254
  
59            
  
60             context.write(key,value);//写入key是k2,value是v2
  
61 //            直接TVPlayData那边自定义好了
  
62            
  
63 //            context.write(new Text(key),new TVPlayData(value));//等价        
  
64            
  
65         }
  
66     }
  
67     
  
68     
  
69     
  
70     /**
  
71      * @input Params Text TvPlayData
  
72      * @output Params Text Text
  
73      * @author zhouls
  
74      * @fuction 统计每部电视剧的 点播数 收藏数等  按source输出到不同文件夹下
  
75      */

  
76     public static>  
77         private Text m_key = new Text();
  
78         private Text m_value = new Text();
  
79     
  
80         private MultipleOutputs<Text, Text> mos;//MultipleOutputs<output Key , output Value >是hadoop里的多目录输出或者说将结果输出到多个文件或目录下
  
81 //        MultipleOutputs,这是MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。
  
82
  
83 //        Hadoop每一个Reducer产生一个输出文件,文件以part-r-00000,part-r-00001的方式进行命名,
  
84 //        如果需要人为的控制输出文件的命名或者每一个Reducer需要写出多个输出文件时,可以采用MultipleOutputs类来完成,
  
85 //        MultipleOutputs采用输出记录的键值对(output Key和output Value)或者任意字符串来生成输出文件的名字,
  
86 //        文件一般以name-r-nnnnn的格式进行命名,其中name是程序设计的任意名字;nnnnn表示分区号。
  
87         
  
88         
  
89         protected void setup(Context context) throws IOException,InterruptedException{   
  
90             mos = new MultipleOutputs<Text, Text>(context);//强制转换,将context的值,转换为MultipleOutputs<k3 , v3 >类型
  
91         }
  
92
  
93         protected void reduce(Text Key, Iterable<TVPlayData> Values,Context context) throws IOException, InterruptedException{
  
94             int daynumber = 0;
  
95             int collectnumber = 0;
  
96             int commentnumber = 0;
  
97             int againstnumber = 0;
  
98             int supportnumber = 0;
  
99            
  
100            
  
101             //迭代器
  
102             for (TVPlayData tv : Values) {//星型for循环,即把Values的值一一传给TVPlayData tv
  
103                 daynumber += tv.getDaynumber();//拿值
  
104                 collectnumber += tv.getCollectnumber();
  
105                 commentnumber += tv.getCommentnumber();
  
106                 againstnumber += tv.getAgainstnumber();
  
107                 supportnumber += tv.getSupportnumber();
  
108             }
  
109            
  
110             //24小时第一季    1    258962    124    48    2    10
  
111             //tvname  source    播放量 收藏数 评论数 踩数 赞数
  
112             //1代表优酷,2代表搜狐,3代表土豆,4代表爱奇艺,5代表迅雷看看
  
113             String[] records = Key.toString().split("\t");//转换成String数组类型
  
114            
  
115             String source = records[1];// 媒体类别
  
116            
  
117             m_key.set(records[0]);//赋值,将records[0]的值,给,m_key
  
118            
  
119            
  
120             m_value.set(daynumber + "\t" + collectnumber + "\t" + commentnumber
  
121                     + "\t" + againstnumber + "\t" + supportnumber);
  
122             //赋值,将daynumber + "\t" + collectnumber + "\t" + commentnumber+ "\t" + againstnumber + "\t" + supportnumber的值,给m_value
  
123            
  
124            
  
125             if (source.equals("1")) {//判别1代表优酷,2代表搜狐,3代表土豆,4代表爱奇艺,5代表迅雷看看
  
126                 mos.write("youku", m_key, m_value);//写入,将"youku", m_key, m_value,写入到mos
  
127             } else if (source.equals("2")) {
  
128                 mos.write("souhu", m_key, m_value);//写入
  
129             } else if (source.equals("3")) {
  
130                 mos.write("tudou", m_key, m_value);//写入
  
131             } else if (source.equals("4")) {
  
132                 mos.write("aiqiyi", m_key, m_value);//写入
  
133             } else if (source.equals("5")) {
  
134                 mos.write("xunlei", m_key, m_value);//写入
  
135 //                m_key=Text
  
136 //                m_key是龟岩许浚
  
137 //               
  
138 //                m_value=Text
  
139 //                m_value是740014    0    0    44    84
  
140             }
  
141         }
  
142
  
143         
  
144         protected void cleanup(Context context) throws IOException,InterruptedException{
  
145 //            context=.WrappedReducer$Context
  
146 //            context是org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context@4c566d9b
  
147            
  
148            
  
149             mos.close();//写完后,要关闭
  
150         }
  
151         
  
152         
  
153     }
  
154
  
155     
  
156     public int run(String[] args) throws Exception {
  
157         Configuration conf = new Configuration();// 配置文件对象
  
158         Path mypath = new Path(args[1]);
  
159         FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
  
160         if (hdfs.isDirectory(mypath)) {
  
161             hdfs.delete(mypath, true);
  
162         }
  
163
  
164         Job job = new Job(conf, "tvplay");// 构造任务
  
165         job.setJarByClass(TVPlayCount.class);// 设置主类
  
166
  
167         job.setMapperClass(TVPlayMapper.class);// 设置Mapper
  
168         job.setMapOutputKeyClass(Text.class);// key输出类型
  
169         job.setMapOutputValueClass(TVPlayData.class);// value输出类型
  
170         job.setInputFormatClass(TVPlayInputFormat.class);//自定义输入格式
  
171
  
172         job.setReducerClass(TVPlayReducer.class);// 设置Reducer
  
173         job.setOutputKeyClass(Text.class);// reduce key类型
  
174         job.setOutputValueClass(Text.class);// reduce value类型
  
175         // 自定义文件输出格式
  
176         MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class,
  
177                 Text.class, Text.class);
  
178         MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class,
  
179                 Text.class, Text.class);
  
180         MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class,
  
181                 Text.class, Text.class);
  
182         MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class,
  
183                 Text.class, Text.class);
  
184         MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class,
  
185                 Text.class, Text.class);
  
186         
  
187         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
  
188         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
  
189         job.waitForCompletion(true);
  
190         return 0;
  
191     }
  
192     public static void main(String[] args) throws Exception {
  
193         
  
194         //集群模式下
  
195         String[] args0 = { "hdfs://HadoopMaster:9000/inputData/fiveTvWebsiteDlay/tvplay.txt",
  
196                 "hdfs://HadoopMaster:9000/outData/fiveTvWebsiteDlay/" };
  
197         
  
198         //本地模式下
  
199 //        String[] args0 = { "./data/tvplay/tvplay.txt",
  
200 //                            "./out/tvplay" };
  
201         
  
202         
  
203         int ec = ToolRunner.run(new Configuration(), new TVPlayCount(), args0);
  
204         System.exit(ec);
  
205     }
  
206 }
  
207

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425421-1-1.html 上篇帖子: Hadoop HDFS编程 API入门系列之RPC版本1(八) 下篇帖子: Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和多种输出格式分析(三十一)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表