设为首页 收藏本站
查看: 1598|回复: 0

[经验分享] Hadoop集群搭建及MapReduce应用

[复制链接]

尚未签到

发表于 2017-12-17 13:35:27 | 显示全部楼层 |阅读模式
  一、Hadoop集群的搭建与配置
  1、节点准备
  集群规划:
  主机名 IP 安装的软件 运行的进程
  weekend 01 192.168.1.60 jdk、hadoop NameNode、DFSZKFailoverController
  weekend 02 192.168.1.61 jdk、hadoop NameNode、DFSZKFailoverController
  weekend 03 192.168.1.62 jdk、hadoop ResourceManager
  weekend 04 192.168.1.63 jdk、hadoop ResourceManager
  weekend 05 192.168.1.64 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
  weekend 06 192.168.1.65 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
  weekend 07 192.168.1.66 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
  2、Java环境安装
  此部分主要描述Java的安装及环境配置。
  上传JDK软件包及hadoop相关软件包并解压,如下:
[hadoop@weekend01 src]$ pwd

  /usr/local/src
[hadoop@weekend01 src]$ ls

  hadoop-2.6.4 jdk1.8.0_92 zookeeper-3.4.6
  hbase-1.2.1 mongodb-linux-x86_64-rhel62-3.2.6 zookeeper-3.4.6.tar.gz
  授权给hadoop用户可以编辑/etc/profile文件,(此步骤非必要)
  添加如下内容至/etc/profile文件中
  export JAVA_HOME=/usr/local/src/jdk1.8.0_92
  export HADOOP_HOME=/usr/local/src/hadoop-2.6.4

  export>  export HBASE_HOME=/usr/local/src/hbase-1.2.1
  export ZK_HOME=/usr/local/src/zookeeper-3.4.6
  export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$PATH:$ZK_HOME/bin
  保存并退出并执行[hadoop@weekend01 ~]$ source /etc/profile(如不行,reboot就可以)然后验证java环境,和hadoop命令,
[hadoop@weekend01 ~]$ java -version

  java version "1.8.0_92"
  Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
  Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
[hadoop@weekend01 ~]$ hadoop version

  Hadoop 2.6.4
  Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 5082c73637530b0b7e115f9625ed7fac69f937e6
  Compiled by jenkins on 2016-02-12T09:45Z
  Compiled with protoc 2.5.0
  From source with checksum 8dee2286ecdbbbc930a6c87b65cbc010
  This command was run using /usr/local/src/hadoop-2.6.4/share/hadoop/common/hadoop-common-2.6.4.jar
  到此就成功配置了JDK和hadoop的环境
  3、SSH配置
  hadoop控制脚本依赖ssh来执行针对整个集群的操作,因此为了支持无缝工作,此部分主要描述SSH的配置。
  [hadoop@weekend01 ~]$ ssh-keygen
  下面直接回车就可以
  然后将公钥分别考别到自己和其他6个节点
  [hadoop@weekend01 ~]$ ssh-copy-id weekend01
  测试
  [hadoop@weekend01 ~]$ ssh weekend01
  Last login: Tue May 31 16:30:47 2016 from slave2
  [hadoop@weekend01 ~]$ exit
  logout
  Connection to weekend01 closed.
  [hadoop@weekend01 ~]$ ssh weekend02
  Last login: Tue Nov 8 11:33:42 2016 from master
  [hadoop@weekwnd02 ~]$ exit
  logout
  Connection to weekend02 closed.
  4、Hadoop配置
  此部分主要描述对Hadoop各个配置文件的修改。
  [hadoop@weekend01 hadoop]$ cat hdfs-site.xml
  <configuration>
  <property>
  <name>dfs.namenode.dir</name>
  <value>/weekend01/namenomde/dir</value>
  </property>
  <property>
  <name>dfs.blocksize</name>
  <value>268435456</value>
  </property>
  <property>
  <name>dfs.namenode.handler.count</name>
  <value>100</value>
  </property>
  <property>
  <name>dfs.datanode.data.dir</name>
  <value>/weekend01.datanode/dir</value>
  </property>
  </configuration>
  [hadoop@weekend01 hadoop]$ cat -n hadoop-env.sh
  26 export JAVA_HOME=/usr/local/src/jdk1.8.0_92
  [hadoop@weekend01 hadoop]$ cat core-site.xml
  <configuration>
  <!-- 指定hdfs的nameservice为weekend01 -->
  <property>
  <name>fs.defaultFS</name>
  <value>hdfs://192.168.1.60:9000</value>
  </property>
  <property>
  <name>io.file.buffer.size</name>
  <value>131072</value>
  </property>
  <!-- 指定hadoop临时目录 -->
  <!--<property>
  <name>hadoop.tmp.dir</name>
  <value>/home/hadoop/app/hadoop-2.4.1/tmp</value>
  </property>
  -->
  <!-- 指定zookeeper地址 -->
  <!--<property>
  <name>ha.zookeeper.quorum</name>
  <value>weekend05:2181,weekend06:2181,weekend07:2181</value>
  </property>-->
  </configuration>
[hadoop@weekend01 hadoop]$ cat mapred-site.xml

  <configuration>
  <!-- 指定mr框架为yarn方式 -->
  <property>
  <name>mapreduce.framework.name</name>
  <value>yarn</value>`
  </property>
  </configuration>
  [hadoop@weekend01 hadoop]$ cat slaves
  weekend05
  weekend06
  weekend07
  [hadoop@weekend01 hadoop]$ cat yarn-site.xml
  <configuration>
  <!-- Site specific YARN configuration properties -->
  <property>
  <name>yarn.acl.enable</name>
  <value>false</value>
  </property>
  <property>
  <name>yarn.admin.acl</name>
  <value>*</value>
  </property>
  <property>
  <name>yarn.log-aggreationanable</name>
  <value>false</value>
  </property>
  <property>
  <name>yarn.resourcemanager.address</name>
  <value>192.168.1.60:9000</value>
  </property>
  <property>
  <name>yarn.resourcemanager.scheduler.address</name>
  <value>192.168.1.60:9000</value>
  </property>
  <property>
  <name>yarn.resourcemanager.resourcetracker.address</name>
  <value>192.168.1.60:9000</value>
  </property>
  <property>
  <name>yarn.resourcemanager.admin.address</name>
  <value>192.168.1.60:9000</value>
  </property>
  <property>
  <name>yarn.resourcemanager.webapp.address</name>
  <value>192.168.1.60:9000</value>
  </property>
  <property>
  <name>yarn.resourcemanager.hostname</name>
  <value>weekend01</value>
  </property>
  <property>
  <name>yarn.resourcemanager.scheduler,class</name>
  <value>CapacityScheduler</value>
  </property>
  <property>
  <name>yarn.</name>
  <value></value>
  </property>
  <property>
  <name>yarn.resourcemanager.</name>
  <value></value>
  </property>
  <property>
  <name>yarn.resourcemanager.</name>
  <value></value>
  </property>
  <property>
  <name>yarn.resourcemanager.</name>
  <value></value>
  </property>
  <property>
  <name>yarn.resourcemanager.</name>
  <value></value>
  </property>
  </configuration>
  5、Hadoop测试
  此部分主要对Hadoop进行测试。
  格式化HDFS文件系统,在weekend01上
[hadoop@weekend01 ~]$ hdfs namenode -format

  …..
  16/11/08 11:00:37 INFO common.Storage: Storage directory /tmp/hadoop-hadoop/dfs/name has been successfully formatted #这句话是格式化成功的标志
  …….
  然后在weekend05、weekend06、weekend07上分别启动zookeeper集群和journalnode进程
  [hadoop@weekend05 ~]$ zkServer.sh start
  JMX enabled by default
  Using config: /usr/local/src/zookeeper-3.4.6/bin/../conf/zoo.cfg
  Starting zookeeper ... STARTED
  [hadoop@weekend05 ~]$ hadoop-daemon.sh start journalnode
  starting journalnode, logging to /usr/local/src/hadoop-2.6.4/logs/hadoop-hadoop-journalnode-weekend05.out
  [hadoop@weekend05 ~]$ jps #看到这三个进程标志着启动成功
  4833 JournalNode
  4881 Jps
  4779 QuorumPeerMain
  然后在weekend01上格式化ZKFC
  [hadoop@weekend01 ~]$ hdfs zkfc -formatZK
  [hadoop@weekend01 ~]$ jps
  4420 SecondaryNameNode
  5148 Jps
  4141 NameNode
  [hadoop@weekend03 ~]$ start-yarn.sh
  [hadoop@weekend03 ~]$ jps
  4837 ResourceManager
  4902 Jps
  [hadoop@weekend01 ~]$ start-dfs.sh
  [hadoop@weekend05 ~]$ jps
  4833 JournalNode
  4948 DataNode
  5530 Jps
  4779 QuorumPeerMain
  5403 NodeManager
  [hadoop@weekend06 ~]$ jps
  2464 QuorumPeerMain
  5448 NodeManager
  2520 JournalNode
  4953 DataNode
  5581 Jps
  [hadoop@weekend07 ~]$ jps
  4864 QuorumPeerMain
  5713 Jps
  5581 NodeManager
  5086 DataNode
  4974 JournalNode
DSC0000.png

  二、MapReduce应用
   1、应用描述
  使用hadoop进行数据统计,并做去重处理,该实验由于采用高可用避免了集群的单点故障,可以有效避免由于namenode单点故障引起的集群崩溃
  2、数据准备
DSC0001.png

  呼出终端,输入下面指令:
  bin/hadoop fs -mkdir hdfsInput
  执行这个命令时可能会提示类似安全的问题,如果提示了,请使用
  bin/hadoop dfsadmin -safemode leave
  来退出安全模式。
  意思是在HDFS远程创建一个输入目录,我们以后的文件需要上载到这个目录里面才能执行。
  在终端依次输入下面指令:
  cd hadoop-1.2.1
  bin/hadoop fs -put file/myTest*.txt hdfsInput
DSC0002.png

  3、设计思路
  1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,key为偏移量(包括了回车符),value为文本行。这一步由MapReduce框架自动完成,如下图:
DSC0003.png

  2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如下图所示:
DSC0004.png

  3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果。如下图:
DSC0005.png

  4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如下图:
DSC0006.png

  4、程序代码
  package com.felix;
  import java.io.IOException;
  import java.util.Iterator;
  import java.util.StringTokenizer;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapred.FileInputFormat;
  import org.apache.hadoop.mapred.FileOutputFormat;
  import org.apache.hadoop.mapred.JobClient;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapred.MapReduceBase;
  import org.apache.hadoop.mapred.Mapper;
  import org.apache.hadoop.mapred.OutputCollector;
  import org.apache.hadoop.mapred.Reducer;
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.mapred.TextInputFormat;
  import org.apache.hadoop.mapred.TextOutputFormat;
  /**
  *
  * 描述:WordCount explains by Felix
  * @author Hadoop Dev Group
  */

  public>  {
  /**
  * MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情)
  * Mapper接口:
  * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。
  * Reporter 则可用于报告整个应用的运行进度,本例中未使用。
  *
  */

  public static>  Mapper<LongWritable, Text, Text, IntWritable>
  {
  /**
  * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,
  * 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。
  */
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  /**
  * Mapper接口中的map方法:
  * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
  * 映射一个单个的输入k/v对到一个中间的k/v对
  * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
  * OutputCollector接口:收集Mapper和Reducer输出的<k,v>对。
  * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
  */
  public void map(LongWritable key, Text value,
  OutputCollector<Text, IntWritable> output, Reporter reporter)
  throws IOException
  {
  String line = value.toString();
  StringTokenizer tokenizer = new StringTokenizer(line);
  while (tokenizer.hasMoreTokens())
  {
  word.set(tokenizer.nextToken());
  output.collect(word, one);
  }
  }
  }

  public static>  Reducer<Text, IntWritable, Text, IntWritable>
  {
  public void reduce(Text key, Iterator<IntWritable> values,
  OutputCollector<Text, IntWritable> output, Reporter reporter)
  throws IOException
  {
  int sum = 0;
  while (values.hasNext())
  {
  sum += values.next().get();
  }
  output.collect(key, new IntWritable(sum));
  }
  }
  public static void main(String[] args) throws Exception
  {
  /**
  * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
  * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
  */
  JobConf conf = new JobConf(WordCount.class);
  conf.setJobName("wordcount"); //设置一个用户定义的job名称
  conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
  conf.setOutputValueClass(IntWritable.class); //为job输出设置value类
  conf.setMapperClass(Map.class); //为job设置Mapper类
  conf.setCombinerClass(Reduce.class); //为job设置Combiner类
  conf.setReducerClass(Reduce.class); //为job设置Reduce类
  conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类
  conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类
  /**
  * InputFormat描述map-reduce中对job的输入定义
  * setInputPaths():为map-reduce job设置路径数组作为输入列表
  * setInputPath():为map-reduce job设置路径数组作为输出列表
  */
  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  JobClient.runJob(conf); //运行一个job
  }
  }
  package com.hadoop.sample;
  import java.io.IOException;
  import java.util.StringTokenizer;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import org.apache.hadoop.util.GenericOptionsParser;

  public>  //继承mapper接口,设置map的输入类型为<Object,Text>
  //输出类型为<Text,IntWritable>

  public static>  //one表示单词出现一次
  private static IntWritable one = new IntWritable(1);
  //word存储切下的单词
  private Text word = new Text();
  public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
  //对输入的行切词
  StringTokenizer st = new StringTokenizer(value.toString());
  while(st.hasMoreTokens()){
  word.set(st.nextToken());//切下的单词存入word
  context.write(word, one);
  }
  }
  }
  //继承reducer接口,设置reduce的输入类型<Text,IntWritable>
  //输出类型为<Text,IntWritable>

  public static>  //result记录单词的频数
  private static IntWritable result = new IntWritable();
  public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
  int sum = 0;
  //对获取的<key,value-list>计算value的和
  for(IntWritable val:values){
  sum += val.get();
  }
  //将频数设置到result
  result.set(sum);
  //收集结果
  context.write(key, result);
  }
  }
  /**
  * @param args
  */
  public static void main(String[] args) throws Exception{
  // TODO Auto-generated method stub
  Configuration conf = new Configuration();
  //检查运行命令
  String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
  if(otherArgs.length != 2){
  System.err.println("Usage WordCount <int> <out>");
  System.exit(2);
  }
  //配置作业名
  Job job = new Job(conf,"word count");
  //配置作业各个类
  job.setJarByClass(WordCount.class);
  job.setMapperClass(Map.class);
  job.setCombinerClass(Reduce.class);
  job.setReducerClass(Reduce.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
  }
  5、运行结果用户登录
  在终端输入下面指令:
  bin/hadoop jar hadoop-examples-1.2.1.jar wordcount hdfsInput hdfsOutput
  注意,这里的示例程序是1.2.1版本的,可能每个机器有所不一致,那么请用*通配符代替版本号
  bin/hadoop jar hadoop-examples-*.jar wordcount hdfsInput hdfsOutput
  应该出现下面结果:
DSC0007.png

  Hadoop命令会启动一个JVM来运行这个MapReduce程序,并自动获得Hadoop的配置,同时把类的路径(及其依赖关系)加入到Hadoop的库中。以上就是Hadoop Job的运行记录,从这里可以看到,这个Job被赋予了一个ID号:job_201202292213_0002,而且得知输入文件有两个(Total input paths to process : 2),同时还可以了解map的输入输出记录(record数及字节数),以及reduce输入输出记录。
  查看HDFS上hdfsOutput目录内容:
  在终端输入下面指令:
  bin/hadoop fs -ls hdfsOutput
DSC0008.png

  从上图中知道生成了三个文件,我们的结果在"part-r-00000"中。
  使用下面指令查看结果输出文件内容
  bin/hadoop fs -cat output/part-r-00000
DSC0009.png

  注:文中部分图引自其他博文,此文仅作为实验笔记!如有侵犯请邮箱至1255560195@qq.com,未经允许不得转载!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425023-1-1.html 上篇帖子: Hadoop完全分布分布式配置 下篇帖子: Hadoop源码分类概要整理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表