设为首页 收藏本站
查看: 1357|回复: 0

[经验分享] Storm安装配置及Real-Life实例

[复制链接]

尚未签到

发表于 2015-11-26 12:09:32 | 显示全部楼层 |阅读模式
  最近找工作看到很多大数据处理的基本都是要求Hadoop、MapReduce之类的,其中如果有熟悉Storm、Spark的会有加分。Storm、Spark属于大数据实时处理的框架,MapReduce是属于离线的、非实时的。这几天都在看Storm,(Spark之前有看过,不过当时耽搁了,不是很深入,之前看的是《Fast Data Processing with Spark》)想记录一下自己的学习过程,希望可以和同学们共同探讨。。。
  看一个技术框架,或许首先应该先google下,看下别人写的博客,技术文章等,大概了解下,如果英语过关,可以直接看官网文档(不过个人感觉storm的文档有点不是很好,根据其提供的例子弄了好些天才搞定,当然或许是自己能力太差也说不定);然后就是搭建集群(Storm集群、当然单机也可以学习的);运行自己的第一个“Word Count”程序(大数据的“hello world”?);结合一些讲原理的书籍或者博客看WordCount的代码,对照理解;尝试自己写代码,并运行,总结经验;之后就是慢慢的积累过程了!(以上纯属个人观点)
  Storm简介:http://www.searchtb.com/2012/09/introduction-to-storm.html , 一篇阿里的技术博客,感觉不错(有一定Hadoop基础看着理解会好点)。里面讲到了Storm记录级容错的原理,看的不是很明白(能力不够呀!)。
  一、Storm安装配置
  首先参考了官网的配置:https://storm.apache.org/documentation/Setting-up-development-environment.html ,跑着有问题。接着,网上找了一篇:http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/,这个感觉靠谱点(我就是参考这个配置的)。
  集群配置:
  node101 : CentOS6.5  64bit、2G 内存、nimbus、ui、zookeeper(虚拟机)
  node102 : CentOS6.5  64bit、1G 内存、supervisor(虚拟机)
  node103 : CentOS6.5  64bit、1G 内存、supervisor(虚拟机)

  版本: Storm:0.9.3 、Zookeeper:zookeeper-3.4.6
  1. 安装并启动Zookeeper
  1)下载Zookeeper,解压到/opt文件夹(这个文件夹可以自定义)
  2)进入解压后的bin目录执行./zkServer.sh start ;
  3)查看是否启动:
  
[iyunv@node101 bin]# ./zkServer.sh status
JMX enabled by default
Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone2. 安装并配置Storm:  
  (jdk不用说了,自己安装吧)
  1)下载ZeroMQ、JZMQ并安装
  下载地址:https://github.com/downloads/saltstack/salt/zeromq-2.1.7-1.el6.x86_64.rpm 、https://s3.amazonaws.com/cdn.michael-noll.com/rpms/jzmq-2.1.0.el6.x86_64.rpm。 这两个rpm官网没有说要安装,不安装的话,后面可能会有问题。(我现在都不清楚后面出现的问题是否是因为这个没有安装)
  
yum install zero*
yum install jzmq*  
  2)下载Storm,并配置;
  在Storm官网下载0.9.3版本,并解压到/opt目录。修改conf/storm.yaml如下:
  
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "node101"
#     - "server2"
#
#
storm.local.dir: "/opt/data/storm"
nimbus.host: "node101"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703  
  然后使用scp把这个文件拷贝到node102、node103机器对应的文件夹中
3)启动storm相关进程  可以在bin目录建立一个start.sh文件,并赋予执行权限;在nimbus机器建立的文件内容如下:
  
#!/bin/bash
/opt/apache-storm-0.9.3/bin/storm nimbus > /dev/null 2>&1 &
/opt/apache-storm-0.9.3/bin/storm ui > /dev/null 2>&1 &在supervisor机器建立的文件如下:  
  
#!/bin/bash
/opt/apache-storm-0.9.3/bin/storm supervisor > /dev/null 2>&1 &当然,也可以参考http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/使用supervision把Storm的相关进程管理起来,这样就可以直接使用service命令了。  
  分别在node101、node102、node103中执行./start.sh 即可启动Storm集群。
  4) ui监控
  浏览器访问:http://node101:8080 ,即可看到集群的监控,如下:
  
第一次启动的时候在Topology summary中不会有内容。

  这样集群就启动了!
  二、Storm第一个程序WordCount
  WordCount程序参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》
  1) 下载https://github.com/storm-book/examples-ch02-getting_started/zipball/master ,导入到自建的java工程中,新建的java工程需要导入Storm的相关包。
  2)修改WordCount使其可以在集群中运行:
  a. 修改TopologyMain.java

package main;
import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;

public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
if(args.length!=4){
System.out.println(&quot;<input> <threadtime> <parallel_wc> <local|distribute>&quot;);
System.exit(-1);
}
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(&quot;word-reader&quot;,new WordReader());
builder.setBolt(&quot;word-normalizer&quot;, new WordNormalizer())
.shuffleGrouping(&quot;word-reader&quot;);
builder.setBolt(&quot;word-counter&quot;, new WordCounter(),Integer.parseInt(args[2]))
.fieldsGrouping(&quot;word-normalizer&quot;, new Fields(&quot;word&quot;));
//Configuration
Config conf = new Config();
conf.put(&quot;wordsFile&quot;, args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
if(args[3].equals(&quot;local&quot;)){
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(&quot;Getting-Started-Toplogie&quot;, conf, builder.createTopology());
try{
Thread.sleep(Long.parseLong(args[1]));
}catch(Exception e){
e.printStackTrace();
}
cluster.shutdown();
}else if (&quot;distribute&quot;.equals(args[3])){
try {
StormSubmitter.submitTopology(&quot;wc1-1&quot;, conf,
builder. createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
System.out.println(&quot;Wrong mode!&quot;);
System.exit(-1);
}
System.out.println(new java.util.Date()+&quot;: 任务已经提交到Strom集群!&quot;);
}
}
  
  这里的提交方式可以选择两种,一种是Local模式一种是分布式的。
b. WordCounter.java 修改  
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
System.out.println(&quot;Counter.size:&quot;+counters.size()+&quot;,input:&quot;+str);
}
修改其exec方法,在最后加上一句打印即可;
  
  3) 打包运行:
  使用eclipse的export把代码打包到jar文件中,在node101中使用storm jar提交任务:
  
storm jar /opt/storm_user_lib/wc1.1.jar main.TopologyMain /opt/wc.txt 5000 2 distribute这里需要先在node102、node103机器的/opt目录下新建wc.txt文档,不然会报文件找不到的错误(其实只用在运行的节点上运行即可);  
  wc.txt:(可以自定义)
  
we are the future
see me
ok
see you
because what i am doing
what the fuck  
  4)查看结果:
  在storm 监控界面找到名字为 wc1-1的Topology,找到其wordcount的bolt,查看其运行节点及worker端口,如下

  在节点node102中的logs中查看work-6702.log文件:

  但是这里并没有看到cleanup的打印信息代码,这个是因为cleanup会在集群关闭的时候调用,所以,如果你把集群关闭就可以看到打印的结果了。(如何关闭集群?jps 查看相关的进程,kill -9直接杀掉)。
  三、Storm Real-Life 例子
  参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》 chapter6
  版本:Storm:0.9.3 ,Redis:2.8.19; Nodejs:0.12.0;jedis:2.6.2;
  1)下载、编译、安装Redis、Nodejs:
  redis
  下载地址:http://redis.io/download
  安装出错:http://www.cnblogs.com/la-isla-bonita/p/3582751.html
nodejs
  下载地址:https://nodejs.org/download/

  jedis
  下载地址:https://github.com/xetorthio/jedis/releases ,下载源码使用mvn编译(使用mvn clean install -Dmaven.test.skip=true命令)
  2)下载源码,并打包:
  下载地址:https://github.com/storm-book/examples-ch06-real-life-app。
  下载源码并导入后,如果没有使用mvn构建的话,会报错。首先把jedis jar包加入classpath路径中,同时也需要把jedis jar包加入到storm_home/lib/下面(三个节点都需要,其实应该只用两个supervisor的就行吧);
  由于版本的原因,所以会有些地方报错,一般包括下面三个部分:
  a. TopologyStarter.java :
  修改redis、nodejs的机器名
  
public final static String REDIS_HOST = &quot;node101&quot;;
public final static int REDIS_PORT = 6379;
public final static String WEBSERVER = &quot;http://node101:3000/news&quot;;去掉下面这句,应该是在21行左右;  
  
//        Logger.getRootLogger().removeAllAppenders();  
  由于虚拟机资源有限,所以把所有的并行都改成了2;
  
builder.setSpout(&quot;read-feed&quot;, new UsersNavigationSpout(), 2);
builder.setBolt(&quot;get-categ&quot;, new GetCategoryBolt(), 2)
.shuffleGrouping(&quot;read-feed&quot;);
builder.setBolt(&quot;user-history&quot;, new UserHistoryBolt(), 2)
.fieldsGrouping(&quot;get-categ&quot;, new Fields(&quot;user&quot;));
builder.setBolt(&quot;product-categ-counter&quot;, new ProductCategoriesCounterBolt(), 2)
.fieldsGrouping(&quot;user-history&quot;, new Fields(&quot;product&quot;));提交任务改为集群提交:  
  
//        LocalCluster cluster = new LocalCluster();
//        cluster.submitTopology(&quot;analytics&quot;, conf, builder.createTopology());
StormSubmitter.submitTopology(&quot;analytics&quot;, conf,
builder. createTopology());b. ProductCategoriesCounterBolt.java、utilities/ProductsReader.java
由于jedis使用中如果有超时,那就会报下面的错误:  
  
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out

java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long cannot be cast to [B一种改法是加大time out的时间,修改上面两个类中的reconnect方法
public void reconnect() {
//this.jedis = new Jedis(host, port);
this.jedis = new Jedis(host, port,10000); // 修改默认time out 到10s
}
c. NewsNotifierBolt.java  
  经过b。a的修改后,还会报错,需要修改上面这个类,把http相关的包改为storm下面的http包相关类
  
import org.apache.storm.http.HttpResponse;
import org.apache.storm.http.client.HttpClient;
import org.apache.storm.http.client.methods.HttpPost;
import org.apache.storm.http.entity.StringEntity;
import org.apache.storm.http.impl.client.DefaultHttpClient;
经过a、b、c的修改,现在可以把源码打成jar包并运行了!  
  

  3) 运行,并查看结果
  在node101中运行redis: nohup redis-server & ;
  运行jar包:storm jar real-life1.1.jar storm.analytics.TopologyStarter ;
  运行Nodejs:node webapp/app.js;
  
在浏览器打开http://node101:3000,即可看到下图
点击某个产品,接着查看相关条目,多点击几次,就会有数据了,如下:  


  同时,查看Storm的监控,可以看到Storm的spout和bolt模块的传输数据记录:


  实例来自《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》。
  例子跑完后,可以开始具体分析各个步骤了,暂时不想分析real-life的例子,有点复杂,可以先分析wordcount,同时结合《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》这本电子书。
  

  

  

  分享,成长,快乐
脚踏实地,专注
转载请注明blog地址:http://blog.csdn.net/fansy1990

  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-143866-1-1.html 上篇帖子: Node groups和Compound matchers(混合匹配) 下篇帖子: salt内置执行模块列表
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表