设为首页 收藏本站
查看: 881|回复: 0

[经验分享] kafka实时监控

[复制链接]

尚未签到

发表于 2017-5-23 15:22:37 | 显示全部楼层 |阅读模式
  在kafka的开发和维护中,我们经常需要了解kafka topic以及连接在其上的consumer的实时信息,比如logsize,offset,owner等。为此kafka提供了ConsumerOffsetChecker,它的用法很简单

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group <group>
  输出结果类似于

Group Topic Pid Offset logSize Lag Owner
Group1 a.topic 0 2 2 0 none
Group1 a.topic 1 0 0 0 none
Group1 a.topic 2 2 2 0 none
  我们也可以通过kafka web console一类的工具直观地获取kafka信息,但如果我们要构建自己的监控系统,需要抓取这些信息的话,有两种办法:一种是运行ConsumerOffsetChecker然后解析输出的字符串;另一种就是通过SimpleConsumer和Zookeeper实时抓取信息(换句话说就是把ConsumerOffsetChecker翻译一下:)),以下介绍第二种方法的思路。
  首先我们看kafka信息在zookeeper的存储结构

1,/brokers/topics/[topic]/partitions/[partitionId]/state
2,/brokers/ids/[0...N]
3,/consumers/[groupId]/ids/[consumerId]
4,/consumers/[groupId]/owners/[topic]/[partitionId]
5,/consumers/[groupId]/offsets/[topic]/[partitionId]
  对于指定的topic和groupid,通过(1)可以拿到所有的partition信息(Pid),然后通过(5)可以拿到offset,通过(4)可以拿到owner。就差logsize还没法拿到,事实上logsize在zookeeper中并没有记录,它必须通过kafka consumer的low level的api取得。

private Long getLogSize(String topicName, String partitionId, long offsetRequestTime) {
SimpleConsumer consumer = new SimpleConsumer(server, port, 10000, 1024000, "ConsumerOffsetChecker");
Long logSize = null;
TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, Integer.parseInt(partitionId));
Map<TopicAndPartition, PartitionOffsetRequestInfo> map = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
map.put(topicAndPartition, new PartitionOffsetRequestInfo(offsetRequestTime, 1));
OffsetRequest request = new OffsetRequest(map, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] aa = response.offsets(topicName, Integer.parseInt(partitionId));
if (aa.length != 0) {
logSize = aa[0];
}
return logSize;
}
  第二行,创建simple consumer
  第四行,创建topic和partition对象
  第六行,创建offset request,第一个参数是记录写入的时间,如果是kafka.api.OffsetRequest.EarliestTime(),则代表当前最早的一条记录,也就是当前最小offset;如果是kafka.api.OffsetRequest.LatestTime(),则代表最新的一条记录,也就是当前最大offset。第二个参数是获取offset的个数。
  由max offset和current offset,我们可以获得当前还有多少消息没有被消费(lag),由(lag/(maxoffset-minoffset)),我们可以算出当前还没有被消费的消息占的百分比,如果这个百分比接近100%,那么接下来很可能会导致offset out of range exception而丢失数据。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379808-1-1.html 上篇帖子: kafka开发实例 下篇帖子: kafka伪分布式安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表