设为首页 收藏本站
查看: 1344|回复: 0

[经验分享] 如何在Hadoop中使用Streaming编写MapReduce(转帖)

[复制链接]

尚未签到

发表于 2018-11-1 09:47:15 | 显示全部楼层 |阅读模式
  作者:马士华 发表于:2008-03-05 12:51 最后更新于:2008-03-25 11:18
  
版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息。
  
http://www.hadoop.org.cn/hadoop/hadoop-streaming/
  Michael G. Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序。
  首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二 安装部署)。Hadoop Streaming帮 助我们用非Java的编程语言使用MapReduce,Streaming用STDIN (标准输入)和STDOUT  (标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如 我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。
  我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按" 1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。
  Python Code
  Map: mapper.py
  

  

  
#!/usr/bin/env python
  

  
import sys
  

  
# maps words to their counts
  
word2count = {}
  

  
# input comes from STDIN (standard input)
  
for line in sys.stdin:
  
    # remove leading and trailing whitespace
  
    line = line.strip()
  
    # split the line into words while removing any empty strings
  
    words = filter(lambda word: word, line.split())
  
    # increase counters
  
    for word in words:
  
        # write the results to STDOUT (standard output);
  
        # what we output here will be the input for the
  
        # Reduce step, i.e. the input for reducer.py
  
        #
  
        # tab-delimited; the trivial word count is 1
  
        print '%s\t%s' % (word, 1)
  

  

  Reduce: reducer.py
  

  

  
#!/usr/bin/env python
  

  
from operator import itemgetter
  
import sys
  

  
# maps words to their counts
  
word2count = {}
  

  
# input comes from STDIN
  
for line in sys.stdin:
  
    # remove leading and trailing whitespace
  
    line = line.strip()
  

  
    # parse the input we got from mapper.py
  
    word, count = line.split()
  
    # convert count (currently a string) to int
  
    try:
  
        count = int(count)
  
        word2count[word] = word2count.get(word, 0) + count
  
    except ValueError:
  
        # count was not a number, so silently
  
        # ignore/discard this line
  
        pass
  

  
# sort the words lexigraphically;
  
#
  
# this step is NOT required, we just do it so that our
  
# final output will look more like the official Hadoop
  
# word count examples
  
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
  

  
# write the results to STDOUT (standard output)
  
for word, count in sorted_word2count:
  
    print '%s\t%s'% (word, count)
  

  

  C Code
  Map: Mapper.c
  

  

  
#include
  
#include
  
#include
  
#include
  

  
#define BUF_SIZE        2048
  
#define DELIM   "\n"
  

  
int main(int argc, char *argv[]){
  
     char buffer[BUF_SIZE];
  
     while(fgets(buffer, BUF_SIZE - 1, stdin)){
  
            int len = strlen(buffer);
  
            if(buffer[len-1] == '\n')
  
             buffer[len-1] = 0;
  

  
            char *querys  = index(buffer, ' ');
  
            char *query = NULL;
  
            if(querys == NULL) continue;
  
            querys += 1; /*  not to include '\t' */
  

  
            query = strtok(buffer, " ");
  
            while(query){
  
                   printf("%s\t1\n", query);
  
                   query = strtok(NULL, " ");
  
            }
  
     }
  
     return 0;
  
}
  
h>h>h>h>
  

  Reduce: Reducer.c
  

  

  
#include
  
#include
  
#include
  
#include
  

  
#define BUFFER_SIZE     1024
  
#define DELIM   "\t"
  

  
int main(int argc, char *argv[]){
  
   char strLastKey[BUFFER_SIZE];
  
   char strLine[BUFFER_SIZE];
  
   int count = 0;
  

  
   *strLastKey = '\0';
  
   *strLine = '\0';
  

  
   while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){
  
          char *strCurrKey = NULL;
  
          char *strCurrNum = NULL;
  

  
          strCurrKey  = strtok(strLine, DELIM);
  
          strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */
  

  
          if( strLastKey[0] == '\0'){
  
                 strcpy(strLastKey, strCurrKey);
  
          }
  

  
          if(strcmp(strCurrKey, strLastKey)){
  
                printf("%s\t%d\n", strLastKey, count);
  
                count = atoi(strCurrNum);
  
          }else{
  
                 count += atoi(strCurrNum);
  
          }
  
          strcpy(strLastKey, strCurrKey);
  

  
   }
  
   printf("%s\t%d\n", strLastKey, count); /* flush the count */
  
   return 0;
  
}
  
h>h>h>h>
  

  

  首先我们调试一下源码:
  

  
chmod +x mapper.py
  
chmod +x reducer.py
  
echo "foo foo quux labs foo bar quux" | ./mapper.py | ./reducer.py
  
bar     1
  
foo     3
  
labs    1
  
quux    2
  
g++ Mapper.c -o Mapper
  
g++ Reducer.c -o Reducer
  
chmod +x Mapper
  
chmod +x Reducer
  
echo "foo foo quux labs foo bar quux" | ./Mapper | ./Reducer
  
bar     1
  
foo     2
  
labs    1
  
quux    1
  
foo     1
  
quux    1
  

  

  你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.
  在Hadoop中运行程序
  首先我们要下载我们的测试文档wget http://www.gutenberg.org/dirs/etext04/7ldvc10.txt.我们把文档存放在/tmp/doc这个目录下.拷贝测试文档到HDFS中.
  

  
bin/hadoop dfs -copyFromLocal /tmp/doc doc
  

  

  运行 bin/hadoop dfs -ls doc 看看拷贝是否成功.接下来我们运行我们的MapReduce的Job.
  

  
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar  -mapper /home/hadoop/Mapper\
  
-reducer /home/hadoop/Reducer  -input doc/* -output c-output -jobconf mapred.reduce.tasks=1
  additionalConfSpec_:null
  null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
  packageJobJar: [] [/home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob60816.jar tmpDir=null
  08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 1
  08/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
  08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_200803031752_0003
  08/03/04 19:03:13 INFO streaming.StreamJob: To kill this job, run:
  08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0003
  08/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0003
  08/03/04 19:03:14 INFO streaming.StreamJob:  map 0%  reduce 0%
  08/03/04 19:03:15 INFO streaming.StreamJob:  map 33%  reduce 0%
  08/03/04 19:03:16 INFO streaming.StreamJob:  map 100%  reduce 0%
  08/03/04 19:03:19 INFO streaming.StreamJob:  map 100%  reduce 100%
  08/03/04 19:03:19 INFO streaming.StreamJob: Job complete: job_200803031752_0003
  08/03/04 19:03:19 INFO streaming.StreamJob: Output: c-output
  bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar  -mapper /home/hadoop/mapper.py\
  -reducer /home/hadoop/reducer.py  -input doc/* -output python-output -jobconf mapred.reduce.tasks=1
  additionalConfSpec_:null
  null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
  packageJobJar: [] [/home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob26099.jar tmpDir=null
  08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 1
  08/03/04 19:05:41 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
  08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_200803031752_0004
  08/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:
  08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0004
  08/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0004
  08/03/04 19:05:42 INFO streaming.StreamJob:  map 0%  reduce 0%
  08/03/04 19:05:48 INFO streaming.StreamJob:  map 33%  reduce 0%
  08/03/04 19:05:49 INFO streaming.StreamJob:  map 100%  reduce 0%
  08/03/04 19:05:52 INFO streaming.StreamJob:  map 100%  reduce 100%
  08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_200803031752_0004
  08/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output
  

  当Job提交后我们还能够在web的界面http://localhost:50030/看到每一个工作的运行情况。

  当Job工作完成后我们能够在c-output和python-output看到一些文件
  

  
bin/hadoop dfs -ls c-output
  

  

  输入下面的命令我们能够看到我们运行完MapReduce的结果
  

  
bin/hadoop dfs -cat c-output/part-00000
  

  

  用Hadoop Streaming运行MapReduce会比较用Java的代码要慢,因为有两方面的原因:


  • 使用 Java API >> C Streaming >> Perl Streaming 这样的一个流程运行会阻塞IO.
  • 不像Java在运行Map后输出结果有一定数量的结果集就启动Reduce的程序,用Streaming要等到所有的Map都运行完毕后才启动Reduce
  如果用Python编写MapReduce的话,另一个可选的是使用Jython来转编译Pyhton为Java的原生码.另外对于C程序员更好的选择是使用Hadoop新的C++ MapReduce API Pipes来编写.不管怎样,毕竟Hadoop提供了一种不使用Java来进行分布式运算的方法.
  下面是从http://www.lunchpauze.com/2007/10/writing-hadoop-mapreduce-program-in-php.html页面中摘下的用php编写的MapReduce程序,供php程序员参考:
  
Map: mapper.php
  

  

  
#!/usr/bin/php
  

  $word2count = array();
  // input comes from STDIN (standard input)
  while (($line = fgets(STDIN)) !== false) {
  // remove leading and trailing whitespace and lowercase
  $line = strtolower(trim($line));
  // split the line into words while removing any empty string
  $words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
  // increase counters
  foreach ($words as $word) {
  $word2count[$word] += 1;
  }
  }
  // write the results to STDOUT (standard output)
  // what we output here will be the input for the
  // Reduce step, i.e. the input for reducer.py
  foreach ($word2count as $word => $count) {
  // tab-delimited
  echo $word, chr(9), $count, PHP_EOL;
  }
  ?>
  

  Reduce: mapper.php
  

  

  
#!/usr/bin/php
  

  $word2count = array();
  // input comes from STDIN
  while (($line = fgets(STDIN)) !== false) {
  // remove leading and trailing whitespace
  $line = trim($line);
  // parse the input we got from mapper.php
  list($word, $count) = explode(chr(9), $line);
  // convert count (currently a string) to int
  $count = intval($count);
  // sum counts
  if ($count > 0) $word2count[$word] += $count;
  }
  // sort the words lexigraphically
  //
  // this set is NOT required, we just do it so that our
  // final output will look more like the official Hadoop
  // word count examples
  ksort($word2count);
  // write the results to STDOUT (standard output)
  foreach ($word2count as $word => $count) {
  echo $word, chr(9), $count, PHP_EOL;
  }
  ?>
  




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-629226-1-1.html 上篇帖子: hadoop中启动namenode等出现的一些问题 下篇帖子: hadoop作业调度-资料
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表