public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
if(args.length!=4){
System.out.println("<input> <threadtime> <parallel_wc> <local|distribute>");
System.exit(-1);
}
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),Integer.parseInt(args[2]))
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
if(args[3].equals("local")){
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
try{
Thread.sleep(Long.parseLong(args[1]));
}catch(Exception e){
e.printStackTrace();
}
cluster.shutdown();
}else if ("distribute".equals(args[3])){
try {
StormSubmitter.submitTopology("wc1-1", conf,
builder. createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
System.out.println("Wrong mode!");
System.exit(-1);
}
System.out.println(new java.util.Date()+": 任务已经提交到Strom集群!");
}
}
这里的提交方式可以选择两种,一种是Local模式一种是分布式的。
b. WordCounter.java 修改
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
System.out.println("Counter.size:"+counters.size()+",input:"+str);
}
修改其exec方法,在最后加上一句打印即可;
storm jar /opt/storm_user_lib/wc1.1.jar main.TopologyMain /opt/wc.txt 5000 2 distribute这里需要先在node102、node103机器的/opt目录下新建wc.txt文档,不然会报文件找不到的错误(其实只用在运行的节点上运行即可);
wc.txt:(可以自定义)
we are the future
see me
ok
see you
because what i am doing
what the fuck
4)查看结果:
在storm 监控界面找到名字为 wc1-1的Topology,找到其wordcount的bolt,查看其运行节点及worker端口,如下
在节点node102中的logs中查看work-6702.log文件:
但是这里并没有看到cleanup的打印信息代码,这个是因为cleanup会在集群关闭的时候调用,所以,如果你把集群关闭就可以看到打印的结果了。(如何关闭集群?jps 查看相关的进程,kill -9直接杀掉)。
三、Storm Real-Life 例子
参考《Getting-Started-With-Storm-Jonathan-Leibiusky-Gabriel-E_1276.pdf》 chapter6
版本:Storm:0.9.3 ,Redis:2.8.19; Nodejs:0.12.0;jedis:2.6.2;
1)下载、编译、安装Redis、Nodejs:
redis
下载地址:http://redis.io/download
安装出错:http://www.cnblogs.com/la-isla-bonita/p/3582751.html
nodejs
下载地址:https://nodejs.org/download/
public final static String REDIS_HOST = "node101";
public final static int REDIS_PORT = 6379;
public final static String WEBSERVER = "http://node101:3000/news";去掉下面这句,应该是在21行左右;
builder.setSpout("read-feed", new UsersNavigationSpout(), 2);
builder.setBolt("get-categ", new GetCategoryBolt(), 2)
.shuffleGrouping("read-feed");
builder.setBolt("user-history", new UserHistoryBolt(), 2)
.fieldsGrouping("get-categ", new Fields("user"));
builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(), 2)
.fieldsGrouping("user-history", new Fields("product"));提交任务改为集群提交:
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
或
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long cannot be cast to [B一种改法是加大time out的时间,修改上面两个类中的reconnect方法
public void reconnect() {
//this.jedis = new Jedis(host, port);
this.jedis = new Jedis(host, port,10000); // 修改默认time out 到10s
}
c. NewsNotifierBolt.java
经过b。a的修改后,还会报错,需要修改上面这个类,把http相关的包改为storm下面的http包相关类