设为首页 收藏本站
查看: 934|回复: 0

[经验分享] Hadoop MapReduce计算框架

[复制链接]

尚未签到

发表于 2018-10-28 14:20:52 | 显示全部楼层 |阅读模式
1、MapReduce理论

1.1、MapReduce是什么?
  MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据);MapReduce是谷歌公司在研究如何处理海量数据所提出的一种面向大规模数据处理的并行计算模型和方法。

1.2、MapReduce概述
  MapReduce是一个计算框架,用于对大数据进行处理,它的主要思想就是“分而治之”;整个MapReduce计算过程可以分为Map(映射)阶段和Reduce(缩减阶段);一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对Map的输出先进行排序, 然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统(HDFS)中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
  Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。
  应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。
  一个Map/Reduce 作业的输入和输出类型如下所示:

  (input)  -> map ->  -> combine ->  -> reduce ->  (output)

  虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。


  • Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序 (例如:Shell工具)来做为mapper和reducer。
  • Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。
1.3、MapReduce核心功能描述
DSC0000.jpg

DSC0001.jpg


1.3.1、MapReduce执行流程


  •   Step 1:从本地HDFS读取文件输入的内容,每个输入文件被切分成一定大小的数据块Block(1.0版本默认64M);

  •   Step 2:每个被切分的数据块Block会产生对应的Map任务,用户可以定义map函数,对被切分的数据使用map函数解析成一个key/value格式的数据;

  •   Step 3:对每个map任务得到的key/value格式的数据按照不同的分区,通过网络传输到不同的Reduce节点;

  •   Step 4:对多个map任务输出的结果进行排序、合并,然后通过reduce函数进程处理得到最新的key/value结果;

  • Step 5:将Reduce输出的结果保存到文件系统中。
1.3.2、Map和Reduce的任务数量
  在整个hadoop的MapReduce计算过程中,需要多少个map和reduce呢?


  • map的任务数量
  Map的任务数量通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。
  Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU 消耗较小的map任务可以设到300个左右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟;
  例如:输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,通过 setNumMapTasks(int)可以修改map数值。


  • reduce的任务数量
  Reduce的任务数量建议是0.95或1.75乘以 ( * mapred.tasktracker.reduce.tasks.maximum)。
  用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。
  增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。

2、MapReduce案例

2.1、使用shell命令演示“mapper”和“reducer”
  对一个英文文本的单词进行统计,shell命令行处理思路如下:


  • 打开文件:cat file
  • 将文本的空格替换成换行符:tr ' ' '\n'
  • 将替换空格文本的单词进行排序: sort -k 1
  • 将排序后的文本显示每行出现单词的数字:uniq -c
  • 将文件中的数字(value)和单词(key)互换位置:awk '{print $2"\t"$1}':
  • 将替换键值的文本相同的词进行汇总并按大小进行排序:sort -k2 -nr
  • 将结果保存为新的文件:newfile
  以上步骤中,第2-5步相当于mapper,第6步为reducer。
  对文本The_Man_of_Property的单词进行统计;
  

$ cat The_Man_of_Property |tr ' ' '\n'|sort -k 1|uniq -c|awk '{print $2 "\t"$1}'|sort -k2 -nr|head -n 20  
the 5144
  
of  3407
  
to  2782
  
and 2573
  
a   2543
  
he  2139
  
his 1912
  
was 1702
  
in  1694
  
had 1526
  
that    1273
  
with    1029
  
her 1020
  
—   931
  
at  815
  
for 765
  
not 723
  
she 711
  
He  695
  
it  689
  

  以上显示了前20行的结果,需要将计算的结果保存下来,追加到文件即可
  

$ cat The_Man_of_Property |tr ' ' '\n'|sort -k 1|uniq -c|awk '{print $2 "\t"$1}'|sort -k2 -nr > wordcountretult.txt  

2.2、使用Streaming运行MapReducer程序
  使用python分别编写mapper和reducer,通过Hadoop Streaming运行程序;

2.2.1 Hadoop Streaming
  Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如:
  

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/wc
  -file /bin/cat
  -file /bin/cat
  


  • -input:输入文件的路径,从HDFS上读取文件
  • -output:计算结果的输出路径,保存到HDFS上
  • -mapper:可执行的程序或脚本
  • -reducer:可执行的程序或脚本
2.2.2 wordcount by Python
  使用Python编写“mapper.py”和“reducer.py”:


  • mapper.py从标准输入读取文件;
  • mapper.py将文本的单词处理成key/value的形式;
  • reducer.py的标准输入就是mapper.py的输出;
  • reducer.py将读入的单词进行归并整理,最终梳理为所有单词的汇总输出到文件;
  注:以下的程序运行在hadoop 1.2.1上;

2.2.2.1、mapper by python
  vim mapper.py
  

#!/usr/bin/env python  

  
import sys
  

  
for line in sys.stdin:
  line = line.strip()
  words = line.split()
  for word in words:
  print "%s\t%s" %(word, 1)
  

  从标准输入读入文件,遍历每一行字符;并对每一行字符去除特殊字符,然后以空格作为分隔符进行遍历输出word:1的key/value的格式;
  文本The_Man_of_Property执行maper.py程序,显示前10行:
  

$ cat The_Man_of_Property |python mapper.py |head  
Preface 1
  
“The    1
  
Forsyte 1
  
Saga”   1
  
was 1
  
the 1
  
title   1
  
originally  1
  
destined    1
  
for 1
  

2.2.2.2、reducer by python
  mapper输出作为reducer的输入,因此,reducer需要对word:1的格式进行处理,排序、相同单词归并,最终输出每个单词的汇总数据;
  vim reducer.py
  

#!/usr/bin/env python  

  
import sys
  

  
current_word = None
  
word_sum = 0
  

  
for line in sys.stdin:
  word_list = line.strip().split('\t')
  if len(word_list) < 2:
  continue
  word = word_list[0].strip()
  word_value = word_list[1].strip()
  

  if current_word == None:
  current_word = word
  if current_word != word:
  print "%s\t%s" %(current_word, str(word_sum))
  current_word = word
  word_sum = 0
  word_sum += int(word_value)
  

  
print "%s\t%s" %(current_word, str(word_sum))
  
word_sum = 0
  

  

  reducer从mapper标准输出读入数据,对相同单词进行汇总;

2.2.2.3、本地测试mapper和reducer
  在本地上先测试的流程方法:cat inputfile|mapper.py|sort|reducer.py
  cat The_Man_of_Property |python mapper.py |sort -k1|python reducer.py

2.2.2.4、hadoop(hadoop-1.2.1)上测试


  • 文本The_Man_of_Property上传到HDFS上
  • mapper.py和reducer.py增加执行权限
  

hadoop fs -mkdir /user/input  
hadoop fs -mkdir /user/output
  
hadoop fs -copyFromLocal The_Man_of_Property /user/input/
  
chmod +x mapper.py reducer.py
  


  • 编写运行streaming的shell脚本:  vim run.sh

  

#!/bin/bash  

  
HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-1.2.1/bin/hadoop"
  
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
  

  
INPUT_FILE_PATH="/user/input/The_Man_of_Property"
  
OUTPUT_FILE_PATH="/user/output/output00"
  

  
#
  
$HADOOP_CMD jar $STREAM_JAR_PATH \
  -input $INPUT_FILE_PATH \
  -output $OUTPUT_FILE_PATH \
  -mapper "python mapper.py" \
  -reducer "python reducer.py" \
  -file ./mapper.py \
  -file ./reducer.py
  

  


  • 给run.sh增加执行权限,运行程序:
  

chmod +x run.sh  
./run.sh
  
packageJobJar: [./mapper.py, ./reducer.py, /home/hadoop/app/hadoop/hadoop-1.2.1/tmp/hadoop-unjar8638446041851116131/] [] /tmp/streamjob3031434578661633957.jar tmpDir=null
  
18/01/16 13:49:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
  
18/01/16 13:49:17 WARN snappy.LoadSnappy: Snappy native library not loaded
  
18/01/16 13:49:17 INFO mapred.FileInputFormat: Total input paths to process : 1
  
18/01/16 13:49:17 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/app/hadoop/hadoop-1.2.1/tmp/mapred/local]
  
18/01/16 13:49:17 INFO streaming.StreamJob: Running job: job_201801150959_0008
  
18/01/16 13:49:17 INFO streaming.StreamJob: To kill this job, run:
  
18/01/16 13:49:17 INFO streaming.StreamJob: /home/hadoop/app/hadoop/hadoop-1.2.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=http://192.168.30.50:9001 -kill job_201801150959_0008
  
18/01/16 13:49:17 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_201801150959_0008
  
18/01/16 13:49:18 INFO streaming.StreamJob:  map 0%  reduce 0%
  
18/01/16 13:49:25 INFO streaming.StreamJob:  map 100%  reduce 0%
  
18/01/16 13:49:33 INFO streaming.StreamJob:  map 100%  reduce 33%
  
18/01/16 13:49:36 INFO streaming.StreamJob:  map 100%  reduce 100%
  
18/01/16 13:49:38 INFO streaming.StreamJob: Job complete: job_201801150959_0008
  
18/01/16 13:49:38 INFO streaming.StreamJob: Output: /user/output/output00
  

  


  • 查看输出结果  

    hadoop fs -ls /user/output/output00  
    Found 3 items
      
    -rw-r--r--   2 hadoop supergroup          0 2018-01-16 13:49 /user/output/output00/_SUCCESS
      
    drwxr-xr-x   - hadoop supergroup          0 2018-01-16 13:49 /user/output/output00/_logs
      
    -rw-r--r--   2 hadoop supergroup     181530 2018-01-16 13:49 /user/output/output00/part-00000
      

      计算的结果输出到文件:/user/output/output00/part-00000,可以将文件下载到本地,和使用shell命令处理的结果是相同的。
      

    $ hadoop fs -get /user/output/output00/part-00000 ./  
    $ cat part-00000 |sort -k2 -nr|head
      
    the 5144
      
    of  3407
      
    to  2782
      
    and 2573
      
    a   2543
      
    he  2139
      
    his 1912
      
    was 1702
      
    in  1694
      
    had 1526

  

$ head wordcountretult.txt  
the 5144
  
of  3407
  
to  2782
  
and 2573
  
a   2543
  
he  2139
  
his 1912
  
was 1702
  
in  1694
  
had 1526
  

3、遇到的问题
  使用streaming运行python编写的mapper和reducer的是出现了如下错误:
  

ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask  

  因为本地验证正常,可以确定的是mapper.py和reducer.py程序应该没有问题;使用 -file 指定mapper和reducer的路径(相对路径或者绝对路径都可以)再一次运行,没有报错了。



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-627585-1-1.html 上篇帖子: hadoop集群搭建(三)配置ubermo模式 下篇帖子: Hadoop 2.7.5 集群搭建基于CentOS7u3-balich
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表