设为首页 收藏本站
查看: 696|回复: 0

[经验分享] 【Hadoop】 c++ && python 实现 Hadoop Streaming 的 partitioner 和 模块化

[复制链接]

尚未签到

发表于 2016-12-9 09:53:48 | 显示全部楼层 |阅读模式
  转自: http://www.cppblog.com/MemoryGarden/archive/2010/01/24/106312.html
  这些东西是我自己的理解,如果有错误的地方,或者有哪些地方走了弯路,请帮我指出我的错误,谢谢!
Hadoop Streaming 是一个工具, 代替编写Java的实现类,而利用可执行程序来完成map-reduce过程.
工作流程:InputFile --> mappers --> [Partitioner] --> reducers --> outputFiles
理解: 
1 输入文件,可以是指定远程文件系统目录(*代表所有文件)
2 通过集群自己分解到各个PC上,每个mapper是一个可执行文件,相应的启动一个进程,来实现你的逻辑
3 mapper的输入为标准输入,所以,任何能够支持标准输入的可执行的东西,c,c++(编译出来的可执行文件),python,......都可以作为mapper 和 reducer mapper的输出为标准输出,如果有Partitioner,就给它,如果没有,它的输出将作为reducer的输入
4 Partitioner 为可选的项,二次排序,可以对结果进行分类打到结果文件里面,它的输入是mapper的标准输出,它的输出,将作为reducer的标准输入
5 reducer 同 mapper
6 输出文件夹,在远端文件不能重名

Hadoop Streaming
  hadoop-streaming.jar 的位置 : $HADOOP_HOME/contrib/streaming 内,官方上面关于hadoop-streaming 的介绍已经很详细了,而且也有了关于python的例子,我就不说了,这里总结下自己的经验
1 指定 mapper or reducer 的 task 官方上说要用 -jobconf 但是这个参数已经过时,不可以用了,官方说要用 -D, 注意这个-D是要作为最开始的配置出现的,因为是在maper 和 reducer 执行之前,就需要硬性指定好的,所以要出现在参数的最前面 ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D .........-input ........ 类似这样,这样,即使你程序最后只指定了一个输出管道,但是还是会有你指定的task数量的结果文件,只不过多余的就是空的,实验以下就知道了.
2 关于二次排序,由于是用的streaming 所以,在可执行文件内,只能够处理逻辑,还有就是输出,当然我们也可以指定二次排序,但是由于是全部参数化,不是很灵活。比如:
10.2.3.40    1
11.22.33.33    1
www.renren.com 1
www.baidu.com    1
10.2.3.40    1
这样一个很规整的输入文件,需求是要把记录独立的ip和url的count 但是输出文件要分分割出来。
官方网站的例子,是指定 key 然后对key 指定 主-key 和 key 用来排序,而 主-key 用来二次排序,这样会输出你想要的东西, 但是对于上面最简单的需求,对于传递参数,我们如何做呢?
其实我们还是可以利用这一点,在我们mapper 里面,还是按照/t来分割key value 但是我们要给key指定一个主-key 用来给Partitioner 来实现二次排序,所以我们可以稍微处理下这个KEY,我们可以简单的判断出来ip 和 url 的区别,这样,我们就人为的加上一个主-key 我们在mapper里面,给每个key人为的加上一个"标签",用来给partitioner做二次排序用,比如我们的mapper的输出是这样
D&10.2.3.40    1
D&11.22.33.33    1
W&www.renren.com 1
W&www.baidu.com    1
D&10.2.3.40    1
然后通过传递命令参数
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner //指定要求二次排序
-jobconf map.output.key.field.separator='&' //这里如果不加两个单引号的话我的命令会死掉
-jobconf num.key.fields.for.partition=1 //这里指第一个 & 符号来分割,保证不会出错
这样我们就可以通过 partitioner 来实现二次排序了
在reducer里面,我们再把"标签"摘掉(不费吹灰之力)就可以做到悄无声息的完成二次排序了。

关于模块化

(强调 : 没有在集群上测试,只在单机上做测试)
程序员最悲剧的就是不能代码复用,做这个也一样,用hadoop-streaming 也一样,要做到代码重用,是我第一个考虑的问题
当我看到 -file(详细可以看官方网站上的讲解) 的时候,我就想到利用这个东西,果然,我的在本机上建立了一个py模块,简单的一个函数
然后在我的mapper里面import 它,本地测试通过后,利用-file 把模块所在的问价夹用 -file moudle/* 这个参数,传入streaming
执行的结果毫无错误,这样,我们就可以抽象出来一些模块的东西,来实现我们模块化的需求
注 : 不要忘记 chmod +x *.py  将py 变成可执行的,不然不可以运行
代码 : 
1: 模块代码 mg.py 用来给 mapper 贴标签
  def mgFunction(line):
        if(line[0] >= '0' and line[0] <= '9'):
                return "D&" + line
        return "W&" + line
2: mapper.py
  #!/usr/bin/env python
import sys
sys.path.append('/home/liuguoqing/Desktop/hadoop-0.19.2/moudle')
import mg
for line in sys.stdin:
        line = mg.mgFunction(line)
        line = line.strip()
#       print line
        words = line.split()
        print '%st%s' % (words[0], words[1])

  3: reducer.py
  #!/usr/bin/env python
import sys
user_login_day = {}
for line in sys.stdin:
        line = line[2:]//去掉帽子
        line = line.strip()
        userid, day = line.split('t', 1)
        user_login_day[userid] = user_login_day.get(userid, 0) + 1
for uid in user_login_day.keys():
        print '%st%d' % (uid, user_login_day[uid])
  这样就实现了模块化的可以二次排序的hadoop-streaming,命令如下

   1: ./bin/hadoop jar hadoop-0.19.2-streaming.jar
   2: #streaming jar
   3: -D mapred.reduce.tasks=2  
   4: #指定2个reduce来处理
   5: -input user_login_day-input2/*  
   6: #指定输入文件 可以用 dir/* 方式
   7: -output user_login_day-output102
   8: #指定输出文件夹
   9: -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py  
  10: #指定mapper 可执行文件 我用全路径,好像用相对路径会出错...
  11: -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py
  12: #指定reducer 可执行文件 
  13: -file ~/Desktop/hadoop-0.19.2/moudle/*
  14: #指定模块化的库文件 dir/* 模式
  15: -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
  16: #指定 partitioner 参数为class
  17: -jobconf map.output.key.field.separator='&'
  18: #指定 主-key 的分割符号为 '&'
  19: -jobconf num.key.fields.for.partition=1



  #指定为第一个‘&’
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D mapred.reduce.tasks=2 -input user_login_day-input2/* -output user_login_day-output102 -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py -file ~/Desktop/hadoop-0.19.2/moudle/* -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf map.output.key.field.separator='&' -jobconf num.key.fields.for.partition=1
10/01/24 03:19:15 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [/home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.py, /home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.pyc, /tmp/hadoop-liuguoqing/hadoop-unjar6780057097425964518/] [] /tmp/streamjob3100401358387519950.jar tmpDir=null
10/01/24 03:19:15 INFO mapred.FileInputFormat: Total input paths to process : 2
10/01/24 03:19:15 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-liuguoqing/mapred/local]
10/01/24 03:19:15 INFO streaming.StreamJob: Running job: job_201001221008_0065
10/01/24 03:19:15 INFO streaming.StreamJob: To kill this job, run:
10/01/24 03:19:15 INFO streaming.StreamJob: /home/liuguoqing/Desktop/hadoop-0.19.2/bin/../bin/hadoop job  -Dmapred.job.tracker=hdfs://localhost:9881 -kill job_201001221008_0065
10/01/24 03:19:15 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201001221008_0065
10/01/24 03:19:16 INFO streaming.StreamJob:  map 0%  reduce 0%
10/01/24 03:19:17 INFO streaming.StreamJob:  map 33%  reduce 0%
10/01/24 03:19:18 INFO streaming.StreamJob:  map 67%  reduce 0%
10/01/24 03:19:19 INFO streaming.StreamJob:  map 100%  reduce 0%
10/01/24 03:19:27 INFO streaming.StreamJob:  map 100%  reduce 50%
10/01/24 03:19:32 INFO streaming.StreamJob:  map 100%  reduce 100%
10/01/24 03:19:32 INFO streaming.StreamJob: Job complete: job_201001221008_0065
10/01/24 03:19:32 INFO streaming.StreamJob: Output: user_login_day-output102
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -ls user_login_day-output102
Found 3 items
drwxr-xr-x   - liuguoqing supergroup          0 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/_logs
-rw-r--r--   1 liuguoqing supergroup         25 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00000
-rw-r--r--   1 liuguoqing supergroup         47 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00001
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -cat user_login_day-output102/part-00000
54321    2
99999    1
12345    12
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -cat user_login_day-output102/part-00001
http://www.renren.com    3
http://www.baidu.com    3
以上为操作结果显示

4 : c++ 的应用
  只要写两个个标准输入输出的mapper reducer,然后
g++ mapper.cpp -o mapper
g++ reducer.cpp -o reducer
生成的两个可执行的 mapper reducer 的文件作为mapper 和 reducer 参数就可以了,执行的命令和上面是一样的. 代码如下

   1: mapper.cpp
   2: 
   3: #include <stdio.h>
   4: #include <string>
   5: #include <iostream>
   6: using namespace std;
   7: int main(){
   8:         string key;
   9:         string value;
  10:         while(cin>>key){
  11:                 cin>>value;
  12:                 cout<<key<<"t"<<value<<endl;
  13:         }
  14:         return 0;
  15: }
  16: 
  17: reducer.cpp
  18: 
  19: #include <stdio.h>
  20: #include <string>
  21: #include <map>
  22: #include <iostream>
  23: using namespace std;
  24: int main(){
  25:         string key;
  26:         string value;
  27:         map<string, int> word2count;
  28:         map<string, int> :: iterator it;
  29:         while(cin>>key){
  30:                 cin>>value;
  31:                 it = word2count.find(key);
  32:                 if(it != word2count.end()){
  33:                         ++it->second;
  34:                 }
  35:                 else{
  36:                         word2count.insert(make_pair(key, 1));
  37:                         it->second = 0;
  38:                 }
  39:         }
  40:         for(it = word2count.begin(); it != word2count.end(); ++it){
  41:                 cout<<it->first<<"t"<<it->second<<endl;
  42:         }
  43:         return 0;
  44: }
  45: 



  这样就可以利用c++来编写 hadoop map-reduce了。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311811-1-1.html 上篇帖子: Could not locate executable null\bin\winutils.exe in the hadoop binary path 下篇帖子: hadoop 0.21.0 配置 安装 部署
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表