设为首页 收藏本站
查看: 1289|回复: 0

[经验分享] Using Python to write Hadoop MapReduce program(Reship)

[复制链接]

尚未签到

发表于 2016-12-11 08:31:46 | 显示全部楼层 |阅读模式
  转载自:http://isilic.iyunv.com/blog/1750271
使用 Python 编写 Hadoop MapReduce 程序

 

以前写 Hadoop 的 MapReduce 程序时,使用的是 Java ,利用 Java 写起来是轻车熟路,没有问题,但是使用 Java 很明显的一个弊端就是每次都要编码、打包、上传、执行,还真心是麻烦,想要更加简单的使用 Hadoop 的运算能力,想要写 MapReduce 程序不那么复杂。还真是个问题。
仔细考虑了下,熟悉的 Python 又得拿起来了,随便搜了下 Python 编写 MapReduce 程序,看了个教程,发现用起来真是方便,遂记录之。
 
 
Hadoop 框架使用 Java 开发的,对 Java 进行了原生的支持,不过对于其它语言也提供了 API 支持,如 Python 、C++ 、 Perl 、 Ruby 等。这个工具就是 Hadoop Streaming ,顾名思义, Streaming 就是 Pipe 操作,说起 pipe ,大家肯定不陌生。最原生的 Python 支持是需要 Jython 支持的,不过这里有额外的方法来实现,大家如果只是使用的话,不用纠结 Jython 转换的问题。
 
 
 
前置条件:
Python 环境
Hadoop 环境( single or cluster )
 
最容易的 Hadoop 编程模型就是 Mapper 和 Reducer 的编写,这种编程模型大大降低了我们对于并发、同步、容错、一致性的要求,你只要编写好自己的业务逻辑,就可以提交任务。然后喝杯茶,结果就出来了,前提是你的业务逻辑没有错误。
使用 Hadoop Streaming ,能够利用 Pipe 模型,而使用 Python 的巧妙之处在于处理输入输出的数据使用的是 STDIN 和 STDOUT ,然后 Hadoop Streaming 会接管一切,转化成 MapReduce 模型。
 
我们还是使用 wordcount 例子,具体内容不再详细解释,如果有不理解的可以自行度之。下面我们先看下 mapper的代码:
 
Python代码   DSC0000.png


  • #!/usr/bin/env python  
  •   
  • import sys  
  • #input comes from STDIN (standard input)  
  • for line in sys.stdin:  
  •     # remove leading and trailing whitespace  
  •     line = line.strip()  
  •     # split the line into words  
  •     words = line.split()  
  •     # increase counters  
  •     for word in words:  
  •         # write the results to STDOUT (standard output);  
  •         # what we output here will be the input for the  
  •         # Reduce step, i.e. the input for reducer.py  
  •         # tab-delimited; the trivial word count is 1  
  •         print '%s\t%s' % (word, 1)  

 
简单解释一下,输入从 sys.stdin 进入,然后进行分割操作,对于每行的分割结果,打印出 word 和 count=1, Mapper 就这么简单。
大家看完 Mapper 之后,会产生疑问,这个怎么能够实现 mapper 功能?我们跳出这个 sys.stdin 模型,再回顾下 MapReduce 的程序。在 Mapper 中,程序不关心你怎么输入,只关心你的输出,这个 Mapper 代码会被放到各个 slave 机器上,去执行 Mapper 过程,其实可以理解为过滤、处理。
在示例中,程序的输入会被进行一系列的处理过程,得到 word 和 count ,这个就是 slave 机器上的数据处理之后的内容。仔细理解下这个过程,对于开发程序还是相当有帮助的。
 
下面我们来看下 Reduce 程序, wordcount 的 reduce 程序就是统计相同 word 的 count 数目,然后再输出。我们还是直接上代码吧:
Python代码  


  • #!/usr/bin/env python  
  • from operator import itemgetter  
  • import sys  
  •   
  • current_word = None  
  • current_count = 0  
  • word = None  
  • # input comes from STDIN  
  • for line in sys.stdin:  
  •     # remove leading and trailing whitespace  
  •     line = line.strip()  
  •     # parse the input we got from mapper.py  
  •     word, count = line.split('\t', 1)  
  •     # convert count (currently a string) to int  
  •     try:  
  •         count = int(count)  
  •     except ValueError:  
  •         # count was not a number, so silently  
  •         # ignore/discard this line  
  •         continue  
  •     # this IF-switch only works because Hadoop sorts map output  
  •     # by key (here: word) before it is passed to the reducer  
  •     if current_word == word:  
  •         current_count += count  
  •     else:  
  •         if current_word:  
  •             # write result to STDOUT  
  •             print '%s\t%s' % (current_word, current_count)  
  •         current_count = count  
  •         current_word = word  
  • # do not forget to output the last word if needed!  
  • if current_word == word:  
  •     print '%s\t%s' % (current_word, current_count)  

 
 
看完这个reduce代码,执行一下,完全没有问题,但是未必真正能理解这个reduce的内容,我来解释一下,明确知道执行流程的可以跳过。
reduce的代码页不复杂,利用Reduce程序,可以得出count数目。如果当前的词和分出来的词一致的话,count相加,如果不一致的话,就打印出来,同时更新输入的wordcount。最后的if是打印出最后一次统计结果。
reduce的执行依赖了MapReduce模型一个要点,在Shuffle过程中,同一个key会放到同一个reduce任务中,这样处理的是一系列连续的相同的key值,当key不一样的时候,就是说开始统计下一个word了。
  <!--EndFragment-->
 
 
利用PythonMapReduce程序就这么多内容,更细节的内容和自己处理的业务相关。
下面测试下结果:
  <!--EndFragment-->
Shell代码  


  • echo "foo foo quux labs foo bar quux" | python ./mapper.py  
  •   foo     1  
  •   foo     1  
  •   quux    1  
  •   labs    1  
  •   foo     1  
  •   bar     1  
  •   quux    1  

 
 进一步可以看到
 
Java代码  


  • echo "foo foo quux labs foo bar quux" | python ./mapper.py | sort -k1,1 | ./reducer.py  
  •   bar     1  
  •   foo     3  
  •   labs    1  
  •   quux    2  

 
下面就是执行Hadoop命令了,在使用Hadoop Streaming时,要使用一定的格式操作才能提交任务。
<!--EndFragment--> 
 
 
Hadoop代码  


  • hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar –mapper mapperfile –file mapper_file_path –reducer reducefile –file reducer_file_path –input input_path –output output_path  

 
将自己的mapperreducer代码代入上面命令中,执行一下看结果是否正确。
  <!--EndFragment-->
 
本文的最后列一下Hadoop Streaming操作的参数,以作备忘。
<!--EndFragment-->
Java代码  


  • Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming.jar [options]  
  •  Options:  
  •    -input    <path>                   DFS input file(s) for the Map step  
  •    -output   <path>                   DFS output directory for the Reduce step  
  •    -mapper   <cmd|JavaClassName>      The streaming command to run  
  •    -combiner <JavaClassName>          Combiner has to be a Java class  
  •    -reducer  <cmd|JavaClassName>      The streaming command to run  
  •    -file     <file>                   File/dir to be shipped in the Job jar file  
  •    -dfs    <h:p>|local                Optional. Override DFS configuration  
  •    -jt     <h:p>|local                Optional. Override JobTracker configuration  
  •    -additionalconfspec specfile       Optional.  
  •    -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.  
  •    -outputformat TextOutputFormat(default)|JavaClassName  Optional.  
  •    -partitioner JavaClassName         Optional.  
  •    -numReduceTasks <num>              Optional.  
  •    -inputreader <spec>                Optional.  
  •    -jobconf  <n>=<v>                  Optional. Add or override a JobConf property  
  •    -cmdenv   <n>=<v>                  Optional. Pass env.var to streaming commands  
  •    -cacheFile fileNameURI  
  •    -cacheArchive fileNameURI  
  •    -verbose  

 
下面简单说下参数的意思:
-input:DFS输入,可以有多个input输入,不过我一般喜欢把输入用逗号{,}分割。
-output:DFS输入,实际上就是Reducer输出
-mapper:MapReduce中的Mapper,看清楚了,也可以是cmd shell命令
-combiner:这个必须是Java
-reducer:MapReducer中的Reducer,也可以是shell命令
-file:这个file参数是用来提交本地的文件,如本地的mapper或者reducer
-dfs:这个是可选的,用来覆盖DFS设定。
-jt:用来覆盖jobtracker的设定
-inputformat:输入格式设定
-outputformat:输出文件的格式设定
 
 
上面的这些参数已经足够平时的应用了,如果有更为细节的需求,就要考虑Streaming是否合适,是否适应自己的业务逻辑。
 
最后再说一句:按照Hadoop Streaming的执行流程,这些参数应该足够了,但是如果我有更复杂的需求:如根据key值分离文件;根据key值重命名文件;读取HDFS上文件配置数据;从多个数据源中读取mapper数据,如HDFS、DataBase、HBase、Nosql等,这些比较灵活的应用使用Python Streaming都有限制,或者是我暂时还没有看到这块。但是目前来说,使用Hadoop Streaming操作能够大量减少代码和流程,比使用Java要方便许多,特别是对于日常的、临时的统计工作。
 
只有更复杂的统计工作和Hadoop Streaming特性,留待以后再行发掘。

  <!--EndFragment-->
 
 
<!--EndFragment--> 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312545-1-1.html 上篇帖子: Hadoop集群搭建之jdk和ssh无密码配置 下篇帖子: FineReport中hadoop,hive数据库连接解决方案
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表