设为首页 收藏本站
查看: 1390|回复: 0

[经验分享] storm笔记 与kafka的集成

[复制链接]

尚未签到

发表于 2017-5-23 18:03:52 | 显示全部楼层 |阅读模式
   storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景。因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用。这里结合自己的应用做个简单总结。
  由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据。如果有需要的话,自己实现也并不困难。使用方法如下:
  

// 设置kafka的zookeeper集群
BrokerHosts hosts = new ZkHosts("10.1.80.249:2181,10.1.80.250:2181,10.1.80.251:2181/kafka");
// 初始化配置信息
SpoutConfig conf = new SpoutConfig(hosts, "topic", "/zkroot","topo");
// 在topology中设置spout
builder.setSpout("kafka-spout", new KafkaSpout(conf));
  这里需要注意的是,spout会根据config的后面两个参数在zookeeper上为每个kafka分区创建保存读取偏移的节点,如:/zkroot/topo/partition_0。默认情况下,spout下会发射域名为bytes的binary数据,如果有需要,可以通过设置schema进行修改。
  如上面所示,使用起来还是很简单的,下面简单的分析一下实现细节。
  (1)初始化

/**
KafkaSpout.open
**/
// 初始化用于读写zookeeper的客户端对象_state
Map stateConf = new HashMap(conf);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
// 初始化用于读取kafka数据coordinator,真正数据读取使用的是内部的PartitionManager
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
 
  (2)读取数据

/**
KafkaSpout.nextTuple
**/
// 通过各个分区对应的PartitionManager读取数据
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
// 调用manager的next方法读取数据并emit
EmitState state = managers.get(_currPartitionIndex).next(_collector);
}
// 提交读取到的位置到zookeeper
long now = System.currentTimeMillis();
if((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
  (3)ack和fail:

/**
KafkaSpout.ack
**/
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
//调用PartitionManager的ack
m.ack(id.offset);
}
/**
KafkaSpout.fail
**/
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
//调用PartitionManager的fail
m.fail(id.offset);
}
 

   可以看出,主要的逻辑都在PartitionManager这个类中。下面对它做个简单的分析:

  (1)构造:


//从zookeeper中读取上一次的偏移
Map<Object, Object> json = _state.readJSON(path);
//根据当前时间获取一个偏移
Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
//maxOffsetBehind为两个偏移的最大范围,如果超过这个范围,则用最新偏移覆盖读取偏移,两个偏移间的数据会被丢弃。如果不希望这样,应该将它设置成一个较大的值或者MAX_VALUE
if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
_committedTo = currentOffset;
}
//初始化当前偏移
_emittedToOffset = _committedTo;

   

  (2)next和fill:


/**
PartitionManager.next
**/
//调用fill填充待发送队列
if (_waitingToEmit.isEmpty()) {
fill();
}
//发送数据
while (true) {
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
}
break;
} else {
ack(toEmit.offset);
}
}
/**
PartitionManager.fill
**/
//初始化当前偏移,读取消息
if (had_failed) {
//先处理失败的偏移
offset = failed.first();
} else {
offset = _emittedToOffset;
}
ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
for (MessageAndOffset msg : msgs) {
final Long cur_offset = msg.offset();
if (cur_offset < offset) {
// Skip any old offsets.
continue;
}
if (!had_failed || failed.contains(cur_offset)) {
numMessages += 1;
//将偏移添加到pending中
_pending.add(cur_offset);
//将消息添加到待发送中
_waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
if (had_failed) {
failed.remove(cur_offset);
}
}
}
   

  (3)ack和fail


/**
PartitionManager.ack
**/
//从_pending中移除该偏移,如果该偏移与当前偏移的差大于maxOffsetBehind,则清空pending
if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
// Too many things pending!
_pending.headSet(offset).clear();
} else {
_pending.remove(offset);
}
numberAcked++;
/**
PartitionManager.fail
**/
//将偏移添加到失败队列中
failed.add(offset);
numberFailed++;
 

   最后,加上一张图做个总结:

DSC0000.png
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379953-1-1.html 上篇帖子: Kafka学习之源代码环境搭建(eclipse) 下篇帖子: 004.Kafka消息存储和处理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表