设为首页 收藏本站
查看: 1198|回复: 0

[经验分享] Apache Storm技术实战之2

[复制链接]
发表于 2015-8-3 11:13:42 | 显示全部楼层 |阅读模式
  欢迎转载,转载请注明出处,徽沪一郎.
  本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容?

BasicDRPCTopology

main函数
  DRPC 分布式远程调用(这个说法有意思,远程调用本来就是分布的,何须再加个D, 看多了, :)
  



public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
if (args == null || args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
for (String word : new String[]{ "hello", "goodbye" }) {
System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
}
cluster.shutdown();
drpc.shutdown();
}
else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
}
}
  
  问题: 上面的代码中只是添加了一个bolt,并没有设定Spout. 我们知道一个topology中最起码得有一个Spout,那么这里的Spout又隐身于何处呢?
  关键的地方就在builder.createLocalTopology, 调用关系如下


  • LinearDRPCTopologyBuilder::createLocalTopology

    •   LinearDRPCTopologyBuilder::createTopology()

      •   LinearDRPCTopologyBuilder::createTopology(new DRPCSpout(_function))
        




  原来DRPCTopology中使用的Spout是DRPCSpout.

LinearDRPCTopology::createTopology
  既然代码已经读到此处,何不再进一步看看createTopology的实现.
  简要说明一下该段代码的处理逻辑:


  • 设置DRPCSpout
  • 以bolt为入参,创建CoordinatedBolt
  • 添加JoinResult Bolt
  • 添加ReturnResult Bolt: ReturnResultBolt连接到DRPCServer,并返回结果



    private StormTopology createTopology(DRPCSpout spout) {
final String SPOUT_ID = "spout";
final String PREPARE_ID = "prepare-request";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout);
builder.setBolt(PREPARE_ID, new PrepareRequest())
.noneGrouping(SPOUT_ID);
int i=0;
for(; i=2) {
source.put(boltId(i-1), SourceArgs.all());
}
IdStreamSpec idSpec = null;
if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
}
BoltDeclarer declarer = builder.setBolt(
boltId(i),
new CoordinatedBolt(component.bolt, source, idSpec),
component.parallelism);
for(Map conf: component.componentConfs) {
declarer.addConfigurations(conf);
}
if(idSpec!=null) {
declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
}
if(i==0 && component.declarations.isEmpty()) {
declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
} else {
String prevId;
if(i==0) {
prevId = PREPARE_ID;
} else {
prevId = boltId(i-1);
}
for(InputDeclaration declaration: component.declarations) {
declaration.declare(prevId, declarer);
}
}
if(i>0) {
declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID);
}
}
IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
OutputFieldsGetter getter = new OutputFieldsGetter();
lastBolt.declareOutputFields(getter);
Map streams = getter.getFieldsDeclaration();
if(streams.size()!=1) {
throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
}
String outputStream = streams.keySet().iterator().next();
List fields = streams.get(outputStream).get_output_fields();
if(fields.size()!=2) {
throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
}
builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
.fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
.fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
i++;
builder.setBolt(boltId(i), new ReturnResults())
.noneGrouping(boltId(i-1));
return builder.createTopology();
}
  

Bolt
  处理逻辑: 在接收到的每一个单词后面添加'!'.



public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
运行



java -cp $(lein classpath) storm.starter.BasicDRPCTopology
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-93638-1-1.html 上篇帖子: 转:apache bench使用简介 下篇帖子: VPS CentOS安装Apache+PHP+MySQL指南
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表