设为首页 收藏本站
查看: 1916|回复: 0

[经验分享] Kafka Consumer端的一些解惑

[复制链接]

尚未签到

发表于 2017-5-23 17:58:05 | 显示全部楼层 |阅读模式
  转载自:http://my.oschina.net/ielts0909/blog?catalog=263107&p=2

  最近一直忙着各种设计和文档,终于有时间来更新一点儿关于kafka的东西。之前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,然后再做相关的处理。
  之前写过kafka是讲日志按照topic的形式存储,一个topic会按照partition存在同一个文件夹下,目录在config/server.properties中指定,具体的存储规则可以查看之前的文章:
  
1# The directory under which to store log files

2log.dir=/tmp/kafka-logs



  Consumer端的目的就是为了获取log日志,然后做进一步的处理。在这里我们可以将数据的处理按照需求分为两个方向,线上和线下,也可以叫实时和离线。实时处理部分类似于网站里的站短,有消息了马上就推送到前端,这是一种对实时性要求极高的模式,kafka可以做到,当然针对站短这样的功能还有更好的处理方式,我主要将kafka线上消费功能用在了实时统计上,处理一些如实时流量汇总、各系统实时吞吐量汇总等。
  这种应用,一般采用一个consumer中的一个group对应一个业务,配合多个producer提供数据,如下图模
式:

DSC0000.jpg
  
  采用这种方式处理很简单,采用官网上给的例子即可解决,只是由于版本的问题,代码稍作更改即可:
01package com.a2.kafka.consumer;

02

03import java.util.HashMap;

04import java.util.List;

05import java.util.Map;

06import java.util.Properties;

07import java.util.concurrent.ExecutorService;

08import java.util.concurrent.Executors;

09

10import kafka.consumer.Consumer;

11import kafka.consumer.ConsumerConfig;

12import kafka.consumer.KafkaStream;

13import kafka.javaapi.consumer.ConsumerConnector;

14import kafka.message.Message;

15import kafka.message.MessageAndMetadata;

16

17public classCommonConsumer {

18publicstaticvoid main(String[] args) {

19// specify some consumer properties

20Properties props =newProperties();

21props.put("zk.connect","192.168.181.128:2181");

22props.put("zk.connectiontimeout.ms","1000000");

23props.put("groupid","test_group");

24

25// Create the connection to the cluster

26ConsumerConfig consumerConfig =newConsumerConfig(props);

27ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

28

29Map<String, Integer> map=newHashMap<String,Integer>();

30map.put("test",2);

31// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume

32Map<String, List<KafkaStream<Message>>> topicMessageStreams =

33consumerConnector.createMessageStreams(map);

34List<KafkaStream<Message>> streams = topicMessageStreams.get("test");

35

36// create list of 4 threads to consume from each of the partitions

37ExecutorService executor = Executors.newFixedThreadPool(4);

38

39// consume the messages in the threads

40for(finalKafkaStream<Message> stream: streams) {

41executor.submit(newRunnable() {

42publicvoidrun() {

43for(MessageAndMetadata<Message> msgAndMetadata: stream) {

44// process message (msgAndMetadata.message())

45System.out.println(msgAndMetadata.message());

46}

47}

48});

49}

50}

51}



  这是一个user level的API,还有一个low level的API可以从官网找到,这里就不贴出来了。这个consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据打印出来,这是不是十分符合实时性的要求。
  当然这里会产生一个很严重的问题,如果你重启一下上面这个程序,那你连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。我已经把结论说出来了,要消费同一组数据,你可以采用不同的group
  
简单说下产生这个问题的原因,这个问题类似于transaction commit,在消息系统中都会有这样一个问题存在,数据消费状态这个信息到底存哪里。是存在consumer端,还是存在broker端。对于这样的争论,一般会出现三种情况:
  

  • At most once—this handles the first case described. Messages are immediately marked as consumed, so they can't be given out twice, but many failure scenarios may lead to losing messages.
  • At least once—this is the second case where we guarantee each message will be delivered at least once, but in failure cases may be delivered twice.
  • Exactly once—this is what people actually want, each message is delivered once and only once.
  第一种情况是将消费的状态存储在了broker端,一旦消费了就改变状态,但会因为网络原因少消费信息,第二种是存在两端,并且先在broker端将状态记为send,等consumer处理完之后将状态标记为consumed,但也有可能因为在处理消息时产生异常,导致状态标记错误等,并且会产生性能的问题。第三种当然是最好的结果。
  Kafka解决这个问题采用high water mark这样的标记,也就是设置offset:
1Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into asetof distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifieswhichpartition a message belongs to. Within a partition messages are storedinthe order inwhichthey arrive at the broker, and will be given out to consumersinthat same order. This means that rather than store metadataforeach message (marking it as consumed, say), we just need to store the"high water mark"foreach combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as"theoffset" for reasons that will becomeclear in the implementation section.



  所以在每次消费信息时,log4j中都会输出不同的offset:
1[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0start fetching topic: test part:0offset: 0from 192.168.181.128:9092

2

3[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0start fetching topic: test part:0offset: 15from 192.168.181.128:9092



除了采用不同的groupid去抓取已经消费过的数据,kafka还提供了另一种思路,这种方式更适合线下的操作,镜像。
DSC0001.jpg

  通过一些配置,就可以将线上产生的数据同步到镜像中去,然后再由特定的集群区处理大批量的数据,这种方式可以采用low level的API按照不同的partition和offset来抓取数据,以获得更好的并行处理效果。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379947-1-1.html 上篇帖子: Kafka JAVA客户端代码示例 下篇帖子: Kafka 转移分区分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表