设为首页 收藏本站
查看: 1294|回复: 0

[经验分享] Apache Kafka系列(三) Java API使用

[复制链接]

尚未签到

发表于 2017-12-24 13:51:11 | 显示全部楼层 |阅读模式

  • Apache Kafka系列(一) 起步
  • Apache Kafka系列(二) 命令行工具(CLI)
  • Apache Kafka系列(三) Java API使用
  • Apache Kafka系列(四) 多线程Consumer方案
  • Apache Kafka系列(五) Kafka Connect及FileConnector示例
摘要:
  Apache Kafka Java Client API

一、基本概念
  Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如:
  1.创建Topic
  2.罗列出已存在的Topic
  3.对已有Topic的Produce/Consume测试
  跟其他的消息系统一样,Kafka提供了多种不用语言实现的客户端API,如:Java,Python,Ruby,Go等。这些API极大的方便用户使用Kafka集群,本文将展示这些API的使用

二、前提


  • 在本地虚拟机中安装了Kafka 0.11.0版本,可以参照前一篇文章:  Apache Kafka系列(一) 起步
  • 本地安装有JDK1.8
  • IDEA编译器
  • Maven3
三、项目结构
  Maven pom.xml如下:
  

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>
  

  <groupId>com.randy</groupId>
  <artifactId>kafka_api_demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>Maven</name>
  

  <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
  

  <dependencies>
  <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.11.0.0</version>
  </dependency>
  </dependencies>
  
</project>
  


四、源码
  4.1 Producer的源码    
  

package com.randy;  

  

import java.util.Properties;  

  

import org.apache.kafka.clients.producer.KafkaProducer;  

import org.apache.kafka.clients.producer.Producer;  

import org.apache.kafka.clients.producer.ProducerRecord;  

  

  

/**  * Author  : RandySun
  * Date    : 2017-08-13  16:23
  * Comment :
*/
  
public>  

  public static void main(String[] args){
  Properties properties = new Properties();
  properties.put("bootstrap.servers", "192.168.1.110:9092");
  properties.put("acks", "all");
  properties.put("retries", 0);
  properties.put("batch.size", 16384);
  properties.put("linger.ms", 1);
  properties.put("buffer.memory", 33554432);
  properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  Producer<String, String> producer = null;
  try {
  producer = new KafkaProducer<String, String>(properties);
  for (int i = 0; i < 100; i++) {
  String msg = "Message " + i;
  producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
  System.out.println("Sent:" + msg);
  }
  } catch (Exception e) {
  e.printStackTrace();
  

  } finally {
  producer.close();
  }
  

  }
  
}
  

  可以使用KafkaProducer类的实例来创建一个Producer,KafkaProducer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:


  • bootstrap.servers
  

properties.put("bootstrap.servers", "192.168.1.110:9092");  

  bootstrap.servers是Kafka集群的IP地址,如果Broker数量超过1个,则使用逗号分隔,如"192.168.1.110:9092,192.168.1.110:9092"。其中,192.168.1.110是我的其中一台虚拟机的
  IP地址,9092是所监听的端口


  • key.serializer   &  value.serializer
  

        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  properties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  

  序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消
  息序列化为二进制类型。本例是发送文本消息到Kafka集群,所以使用的是StringSerializer。


  • 发送Message到Kafka集群
  

   for (int i = 0; i < 100; i++) {  String msg
= "Message " + i;  producer.send(
new ProducerRecord<String, String>("HelloWorld", msg));  System.out.println(
"Sent:" + msg);  }
  

  上述代码会发送100个消息到HelloWorld这个Topic
  4.2 Consumer的源码
  

package com.randy;  

  

import java.util.Arrays;  

import java.util.Properties;  

  

import org.apache.kafka.clients.consumer.ConsumerRecord;  

import org.apache.kafka.clients.consumer.ConsumerRecords;  

import org.apache.kafka.clients.consumer.KafkaConsumer;  

  

/**  * Author  : RandySun
  * Date    : 2017-08-13  17:06
  * Comment :
*/
  
public>  

  public static void main(String[] args){
  Properties properties = new Properties();
  properties.put("bootstrap.servers", "192.168.1.110:9092");
  properties.put("group.id", "group-1");
  properties.put("enable.auto.commit", "true");
  properties.put("auto.commit.interval.ms", "1000");
  properties.put("auto.offset.reset", "earliest");
  properties.put("session.timeout.ms", "30000");
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  

  KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
  while (true) {
  ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
  System.out.printf("offset = %d, value = %s", record.offset(), record.value());
  System.out.println();
  }
  }
  

  }
  
}
  

  可以使用KafkaConsumer类的实例来创建一个Consumer,KafkaConsumer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:


  • bootstrap.servers
  和Producer一样,是指向Kafka集群的IP地址,以逗号分隔。


  • group.id
  Consumer分组ID


  • key.deserializer and value.deserializer
       发序列化。Consumer把来自Kafka集群的二进制消息反序列化为指定的类型。因本例中的Producer使用的是String类型,所以调用StringDeserializer来反序列化
  Consumer订阅了Topic为HelloWorld的消息,Consumer调用poll方法来轮循Kafka集群的消息,其中的参数100是超时时间(Consumer等待直到Kafka集群中没有消息为止):
  

        kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));while (true) {  ConsumerRecords
<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {  System.out.printf(
"offset = %d, value = %s", record.offset(), record.value());  System.out.println();
  }
  }
  


五、总结
  本文展示了如何创建一个Producer并生成String类型的消息,Consumer消费这些消息。这些都是基于Apache Kafka 0.11.0 Java API。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-427523-1-1.html 上篇帖子: 【译】Apache Flink Kafka consumer 下篇帖子: Apache Kafka系列(四) 多线程Consumer方案
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表