设为首页 收藏本站
查看: 1549|回复: 0

[经验分享] Apache Kafka系列(四) 多线程Consumer方案

[复制链接]

尚未签到

发表于 2017-12-24 16:13:33 | 显示全部楼层 |阅读模式

  • Apache Kafka系列(一) 起步
  • Apache Kafka系列(二) 命令行工具(CLI)
  • Apache Kafka系列(三) Java API使用
  • Apache Kafka系列(四) 多线程Consumer方案
  • Apache Kafka系列(五) Kafka Connect及FileConnector示例
  本文的图片是通过PPT截图出的,读者如果修改意见请联系我

一、Consumer为何需要实现多线程
  假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息。该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用API写入消息到Kafka Cluster中,然后消息的订阅者通过Consumer读取消息,刚开始的时候系统架构图如下:
DSC0000.png

  但是,随着用户数量的增多,通知的数据也会对应的增长。总会达到一个阈值,在这个点上,Producer产生的数量大于Consumer能够消费的数量。那么Broker中未消费的消息就会逐渐增多。即使Kafka使用了优秀的消息持久化机制来保存未被消费的消息,但是Kafka的消息保留机制限制(时间,分区大小,消息Key)也会使得始终未被消费的Message被永久性的删除。另一方面从业务上讲,一个消息通知系统的高延迟几乎算作是废物了。所以多线程的Consumer模型是非常有必要的。

二、多线程的Kafka Consumer 模型类别
  基于Consumer的多线程模型有两种类型:


  • 模型一:多个Consumer且每一个Consumer有自己的线程,对应的架构图如下:
DSC0001.png



  • 模型二:一个Consumer且有多个Worker线程
DSC0002.png

  两种实现方式的优点/缺点比较如下:

名称优点缺点模型一  1.Consumer Group容易实现
  2.各个Partition的顺序实现更容易
  1.Consumer的数量不能超过Partition的数量,否则多出的Consumer永远不会被使用到
  2.因没个Consumer都需要一个TCP链接,会造成大量的系统性能损耗

模型二
1.由于通过线程池实现了Consumer,横向扩展更方便  1.在每个Partition上实现顺序处理更困难。
  例如:同一个Partition上有两个待处理的Message需要被线程池中的2个线程消费掉,那这两个线程必须实现同步

三、代码实现

3.1 前提




    • Kafka Broker 0.11.0
    • JDK1.8
    • IDEA
    • Maven3
    • Kafka环境搭建及Topic创建修改等请参照本系列的前几篇文章。

3.2 源码结构
DSC0003.png

  其中,consumergroup包下面对应的是模型一的代码,consumerthread包下是模型二的代码。ProducerThread是生产者代码。

3.3 pom.xml
  

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  <modelVersion>4.0.0</modelVersion>
  <groupId>com.randy</groupId>
  <artifactId>kafka_multithread_consumer_model</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>kafka_multithread_consumer_model Maven Webapp</name>
  <url>http://maven.apache.org</url>
  

  

  <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
  

  <dependencies>
  <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.11.0.0</version>
  </dependency>
  </dependencies>
  

  <build>
  <finalName>kafka_multithread_consumer_model</finalName>
  </build>
  
</project>
  


3.4 方案一:Consumer Group
  ProducerThread.java是一个生产者线程,发送消息到Broker
  ConsumerThread.java是一个消费者线程,由于消费消息
  ConsumerGroup.java用于产生一组消费者线程
  ConsumerGroupMain.java是入口类     
  3.4.1 ProducerThread.java 

DSC0004.gif DSC0005.gif   

package com.randy;  

  

import org.apache.kafka.clients.producer.*;  

  

import java.util.Properties;  

  

/**  * Author  : RandySun (sunfeng152157@sina.com)
  * Date    : 2017-08-20  11:41
  * Comment :
*/  
public>  private final Producer<String,String> kafkaProducer;
  private final String topic;
  

  public ProducerThread(String brokers,String topic){
  Properties properties = buildKafkaProperty(brokers);
  this.topic = topic;
  this.kafkaProducer = new KafkaProducer<String,String>(properties);
  

  }
  

  private static Properties buildKafkaProperty(String brokers){
  Properties properties = new Properties();
  properties.put("bootstrap.servers", brokers);
  properties.put("acks", "all");
  properties.put("retries", 0);
  properties.put("batch.size", 16384);
  properties.put("linger.ms", 1);
  properties.put("buffer.memory", 33554432);
  properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  return properties;
  }
  

  @Override
  public void run() {
  System.out.println("start sending message to kafka");
  int i = 0;
  while (true){
  String sendMsg = "Producer message number:"+String.valueOf(++i);
  kafkaProducer.send(new ProducerRecord<String, String>(topic,sendMsg),new Callback(){
  

  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  if(e != null){
  e.printStackTrace();
  }
  System.out.println("Producer Message: Partition:"+recordMetadata.partition()+",Offset:"+recordMetadata.offset());
  }
  });
  // thread sleep 3 seconds every time
  try {
  Thread.sleep(3000);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  System.out.println("end sending message to kafka");
  }
  }
  
}
  


View Code  3.4.2 ConsumerThread.java


  

package com.randy.consumergroup;  

  

import org.apache.kafka.clients.consumer.ConsumerRecord;  

import org.apache.kafka.clients.consumer.ConsumerRecords;  

import org.apache.kafka.clients.consumer.KafkaConsumer;  

  

import java.util.Arrays;  

import java.util.Properties;  

  

/**  * Author  : RandySun (sunfeng152157@sina.com)
  * Date    : 2017-08-20  12:03
  * Comment :
*/  
public>  private static KafkaConsumer<String,String> kafkaConsumer;
  private final String topic;
  

  public ConsumerThread(String brokers,String groupId,String topic){
  Properties properties = buildKafkaProperty(brokers,groupId);
  this.topic = topic;
  this.kafkaConsumer = new KafkaConsumer<String, String>(properties);
  this.kafkaConsumer.subscribe(Arrays.asList(this.topic));
  }
  

  private static Properties buildKafkaProperty(String brokers,String groupId){
  Properties properties = new Properties();
  properties.put("bootstrap.servers", brokers);
  properties.put("group.id", groupId);
  properties.put("enable.auto.commit", "true");
  properties.put("auto.commit.interval.ms", "1000");
  properties.put("session.timeout.ms", "30000");
  properties.put("auto.offset.reset", "earliest");
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  return properties;
  }
  

  @Override
  public void run() {
  while (true){
  ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100);
  for(ConsumerRecord<String,String> item : consumerRecords){
  System.out.println("Consumer Message:"+item.value()+",Partition:"+item.partition()+"Offset:"+item.offset());
  }
  }
  }
  
}
  


View Code  3.4.3 ConsumerGroup.java


  

package com.randy.consumergroup;  

  

import java.util.ArrayList;  

import java.util.List;  

  

/**  * Author  : RandySun (sunfeng152157@sina.com)
  * Date    : 2017-08-20  14:09
  * Comment :
*/  
public>  private final String brokers;
  private final String groupId;
  private final String topic;
  private final int consumerNumber;
  private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>();
  

  public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){
  this.groupId = groupId;
  this.topic = topic;
  this.brokers = brokers;
  this.consumerNumber = consumerNumber;
  for(int i = 0; i< consumerNumber;i++){
  ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
  consumerThreadList.add(consumerThread);
  }
  }
  

  public void start(){
  for (ConsumerThread item : consumerThreadList){
  Thread thread = new Thread(item);
  thread.start();
  }
  }
  
}
  


View Code  3.4.4 ConsumerGroupMain.java  


  

package com.randy.consumergroup;  

  

import com.randy.ProducerThread;  

  

/**  * Author  : RandySun (sunfeng152157@sina.com)
  * Date    : 2017-08-20  14:18
  * Comment :
*/  
public>  

  public static void main(String[] args){
  String brokers = "Server2:9092";
  String groupId = "group01";
  String topic = "HelloWorld";
  int consumerNumber = 3;
  

  Thread producerThread = new Thread(new ProducerThread(brokers,topic));
  producerThread.start();
  

  ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber);
  consumerGroup.start();
  }
  
}
  


View Code
3.5 方案二:多线程的Consumer
  ConsumerThreadHandler.java用于处理发送到消费者的消息
  ConsumerThread.java是消费者使用线程池的方式初始化消费者线程
  ConsumerThreadMain.java是入口类
  3.5.1 ConsumerThreadHandler.java


  

package com.randy.consumerthread;  

  

import org.apache.kafka.clients.consumer.ConsumerRecord;  

  

/**  * Author  : RandySun (sunfeng152157@sina.com)
  * Date    : 2017-08-20  16:29
  * Comment :
*/  
public>  private ConsumerRecord consumerRecord;
  

  public ConsumerThreadHandler(ConsumerRecord consumerRecord){
  this.consumerRecord = consumerRecord;
  }
  

  @Override
  public void run() {
  System.out.println("Consumer Message:"+consumerRecord.value()+",Partition:"+consumerRecord.partition()+"Offset:"+consumerRecord.offset());
  }
  
}
  


View Code  3.5.2 ConsumerThread.java


  

package com.randy.consumerthread;  

  

import org.apache.kafka.clients.consumer.ConsumerRecord;  

import org.apache.kafka.clients.consumer.ConsumerRecords;  

import org.apache.kafka.clients.consumer.KafkaConsumer;  

  

import java.util.Arrays;  

import java.util.Properties;  

import java.util.concurrent.ArrayBlockingQueue;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.ThreadPoolExecutor;  

import java.util.concurrent.TimeUnit;  

  

/**  * Author  : RandySun (sunfeng152157@sina.com)
  * Date    : 2017-08-20  16:42
  * Comment :
*/  
public>  

  private final KafkaConsumer<String, String> consumer;
  private final String topic;
  // Threadpool of consumers
  private ExecutorService executor;
  

  

  public ConsumerThread(String brokers, String groupId, String topic){
  Properties properties = buildKafkaProperty(brokers,groupId);
  this.consumer = new KafkaConsumer<>(properties);
  this.topic = topic;
  this.consumer.subscribe(Arrays.asList(this.topic));
  }
  

  public void start(int threadNumber){
  executor = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
  while (true){
  ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
  for (ConsumerRecord<String,String> item : consumerRecords){
  executor.submit(new ConsumerThreadHandler(item));
  }
  }
  }
  

  private static Properties buildKafkaProperty(String brokers, String groupId){
  Properties properties = new Properties();
  properties.put("bootstrap.servers", brokers);
  properties.put("group.id", groupId);
  properties.put("enable.auto.commit", "true");
  properties.put("auto.commit.interval.ms", "1000");
  properties.put("session.timeout.ms", "30000");
  properties.put("auto.offset.reset", "earliest");
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  return properties;
  }
  

  

  
}
  


View Code  3.5.3 ConsumerThreadMain.java


  

package com.randy.consumerthread;  

  

import com.randy.ProducerThread;  

  

/**  * Author  : RandySun (sunfeng152157@sina.com)
  * Date    : 2017-08-20  16:49
  * Comment :
*/
  
public>  

  public static void main(String[] args){
  String brokers = "Server2:9092";
  String groupId = "group01";
  String topic = "HelloWorld";
  int consumerNumber = 3;
  

  

  Thread producerThread = new Thread(new ProducerThread(brokers,topic));
  producerThread.start();
  

  ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
  consumerThread.start(3);
  

  

  }
  
}
  


View Code
四. 总结
  本篇文章列举了两种不同的消费者模式。两者各有利弊。所有代码都上传到了https://github.com/qizhelongdeyang/kafka_multithread_consumer_model.git ,如有疑问或者错误请指正

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-427553-1-1.html 上篇帖子: Apache Kafka系列(三) Java API使用 下篇帖子: 初识Apache Kafka+JAVA程序实例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表