设为首页 收藏本站
查看: 1248|回复: 0

[经验分享] (转)Kafka部署与代码实例

[复制链接]
发表于 2017-5-23 17:51:05 | 显示全部楼层 |阅读模式
转 http://shift-alt-ctrl.iteye.com/blog/1930791
 
 
kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
    其中kafka为0.8V,zookeeper为3.4.5V
 
一.Zookeeper集群构建
    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.
    1) zk-0
调整配置文件:
Php代码   DSC0000.png


  • clientPort=2181  
  • server.0=127.0.0.1:2888:3888  
  • server.1=127.0.0.1:2889:3889  
  • server.2=127.0.0.1:2890:3890  
  • ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper
Java代码  


  • ./zkServer.sh start  

    2) zk-1
调整配置文件(其他配置和zk-0一只):
Php代码  


  • clientPort=2182  
  • ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper
 
Java代码  


  • ./zkServer.sh start  

    3) zk-2
调整配置文件(其他配置和zk-0一只):
Php代码  


  • clientPort=2183  
  • ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper
 
Java代码  


  • ./zkServer.sh start  

  
二. Kafka集群构建
    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
    1) kafka-0
    在config目录下修改配置文件为:
Java代码  


  • broker.id=0  
  • port=9092  
  • num.network.threads=2  
  • num.io.threads=2  
  • socket.send.buffer.bytes=1048576  
  • socket.receive.buffer.bytes=1048576  
  • socket.request.max.bytes=104857600  
  • log.dir=./logs  
  • num.partitions=2  
  • log.flush.interval.messages=10000  
  • log.flush.interval.ms=1000  
  • log.retention.hours=168  
  • #log.retention.bytes=1073741824  
  • log.segment.bytes=536870912  
  • ##replication机制,让每个topic的partitions在kafka-cluster中备份2个  
  • ##用来提高cluster的容错能力..  
  • default.replication.factor=1  
  • log.cleanup.interval.mins=10  
  • zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  • zookeeper.connection.timeout.ms=1000000  

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
Java代码  


  • > cd kafka-0  
  • > ./sbt update  
  • > ./sbt package  
  • > ./sbt assembly-package-dependency   

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
Java代码  


  • > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
    2) kafka-1
Java代码  


  • broker.id=1  
  • port=9093  
  • ##其他配置和kafka-0保持一致  

    然后和kafka-0一样执行打包命令,然后启动此broker.
Java代码  


  • > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &  

    仍然可以通过如下指令查看topic的"partition"/"replicas"的分布和存活情况.
Java代码  


  • > bin/kafka-list-topic.sh --zookeeper localhost:2181  
  • topic: my-replicated-topic  partition: 0    leader: 2   replicas: 1,2,0 isr: 2  
  • topic: test partition: 0    leader: 0   replicas: 0 isr: 0   

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。[配置参数详解]
 
三.项目准备
    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
Java代码  


  • <dependencies>  
  •     <dependency>  
  •         <groupId>log4j</groupId>  
  •         <artifactId>log4j</artifactId>  
  •         <version>1.2.14</version>  
  •     </dependency>  
  •     <dependency>  
  •         <groupId>org.apache.kafka</groupId>  
  •         <artifactId>kafka_2.8.2</artifactId>  
  •         <version>0.8.0</version>  
  •         <exclusions>  
  •             <exclusion>  
  •                 <groupId>log4j</groupId>  
  •                 <artifactId>log4j</artifactId>  
  •             </exclusion>  
  •         </exclusions>  
  •     </dependency>  
  •     <dependency>  
  •         <groupId>org.scala-lang</groupId>  
  •         <artifactId>scala-library</artifactId>  
  •         <version>2.8.2</version>  
  •     </dependency>  
  •     <dependency>  
  •         <groupId>com.yammer.metrics</groupId>  
  •         <artifactId>metrics-core</artifactId>  
  •         <version>2.2.0</version>  
  •     </dependency>  
  •     <dependency>  
  •         <groupId>com.101tec</groupId>  
  •         <artifactId>zkclient</artifactId>  
  •         <version>0.3</version>  
  •     </dependency>  
  • </dependencies>  

 
四.Producer端代码
    1) producer.properties文件:此文件放在/resources目录下
Java代码  


  • #partitioner.class=  
  • ##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata  
  • ##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来  
  • ##此值,我们可以在spring中注入过来  
  • ##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093  
  • ##,127.0.0.1:9093  
  • ##同步,建议为async  
  • producer.type=sync  
  • compression.codec=0  
  • serializer.class=kafka.serializer.StringEncoder  
  • ##在producer.type=async时有效  
  • #batch.num.messages=100  

    2) KafkaProducerClient.java代码样例
Java代码  


  • import java.util.ArrayList;  
  • import java.util.Collection;  
  • import java.util.List;  
  • import java.util.Properties;  
  •   
  • import kafka.javaapi.producer.Producer;  
  • import kafka.producer.KeyedMessage;  
  • import kafka.producer.ProducerConfig;  
  •   
  • /** 
  •  * User: guanqing-liu 
  •  */  
  • public class KafkaProducerClient {  
  •   
  •     private Producer<String, String> inner;  
  •       
  •     private String brokerList;//for metadata discovery,spring setter  
  •     private String location = "kafka-producer.properties";//spring setter  
  •       
  •     private String defaultTopic;//spring setter  
  •   
  •     public void setBrokerList(String brokerList) {  
  •         this.brokerList = brokerList;  
  •     }  
  •   
  •     public void setLocation(String location) {  
  •         this.location = location;  
  •     }  
  •   
  •     public void setDefaultTopic(String defaultTopic) {  
  •         this.defaultTopic = defaultTopic;  
  •     }  
  •   
  •     public KafkaProducerClient(){}  
  •       
  •     public void init() throws Exception {  
  •         Properties properties = new Properties();  
  •         properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));  
  •           
  •           
  •         if(brokerList != null) {  
  •             properties.put("metadata.broker.list", brokerList);  
  •         }  
  •   
  •         ProducerConfig config = new ProducerConfig(properties);  
  •         inner = new Producer<String, String>(config);  
  •     }  
  •   
  •     public void send(String message){  
  •         send(defaultTopic,message);  
  •     }  
  •       
  •     public void send(Collection<String> messages){  
  •         send(defaultTopic,messages);  
  •     }  
  •       
  •     public void send(String topicName, String message) {  
  •         if (topicName == null || message == null) {  
  •             return;  
  •         }  
  •         KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
  •         inner.send(km);  
  •     }  
  •   
  •     public void send(String topicName, Collection<String> messages) {  
  •         if (topicName == null || messages == null) {  
  •             return;  
  •         }  
  •         if (messages.isEmpty()) {  
  •             return;  
  •         }  
  •         List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
  •         int i= 0;  
  •         for (String entry : messages) {  
  •             KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
  •             kms.add(km);  
  •             i++;  
  •             if(i % 20 == 0){  
  •                 inner.send(kms);  
  •                 kms.clear();  
  •             }  
  •         }  
  •           
  •         if(!kms.isEmpty()){  
  •             inner.send(kms);  
  •         }  
  •     }  
  •   
  •     public void close() {  
  •         inner.close();  
  •     }  
  •   
  •     /** 
  •      * @param args 
  •      */  
  •     public static void main(String[] args) {  
  •         KafkaProducerClient producer = null;  
  •         try {  
  •             producer = new KafkaProducerClient();  
  •             //producer.setBrokerList("");  
  •             int i = 0;  
  •             while (true) {  
  •                 producer.send("test-topic""this is a sample" + i);  
  •                 i++;  
  •                 Thread.sleep(2000);  
  •             }  
  •         } catch (Exception e) {  
  •             e.printStackTrace();  
  •         } finally {  
  •             if (producer != null) {  
  •                 producer.close();  
  •             }  
  •         }  
  •   
  •     }  
  •   
  • }  

    3) spring配置
Java代码  


  • <bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close">  
  •     <property name="zkConnect" value="${zookeeper_cluster}"></property>  
  •     <property name="defaultTopic" value="${kafka_topic}"></property>  
  • </bean>  

 
五.Consumer端
1) consumer.properties:文件位于/resources目录下
Java代码  


  • ## 此值可以配置,也可以通过spring注入  
  • ##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  • ##,127.0.0.1:2182,127.0.0.1:2183  
  • # timeout in ms for connecting to zookeeper  
  • zookeeper.connectiontimeout.ms=1000000  
  • #consumer group id  
  • group.id=test-group  
  • #consumer timeout  
  • #consumer.timeout.ms=5000  
  • auto.commit.enable=true  
  • auto.commit.interval.ms=60000  

    2) KafkaConsumerClient.java代码样例
Java代码  


  • package com.test.kafka;  
  • import java.nio.ByteBuffer;  
  • import java.nio.CharBuffer;  
  • import java.nio.charset.Charset;  
  • import java.util.HashMap;  
  • import java.util.List;  
  • import java.util.Map;  
  • import java.util.Properties;  
  • import java.util.concurrent.ExecutorService;  
  • import java.util.concurrent.Executors;  
  •   
  • import kafka.consumer.Consumer;  
  • import kafka.consumer.ConsumerConfig;  
  • import kafka.consumer.ConsumerIterator;  
  • import kafka.consumer.KafkaStream;  
  • import kafka.javaapi.consumer.ConsumerConnector;  
  • import kafka.message.Message;  
  • import kafka.message.MessageAndMetadata;  
  •   
  • /** 
  •  * User: guanqing-liu  
  •  */  
  • public class KafkaConsumerClient {  
  •   
  •     private String groupid; //can be setting by spring  
  •     private String zkConnect;//can be setting by spring  
  •     private String location = "kafka-consumer.properties";//配置文件位置  
  •     private String topic;  
  •     private int partitionsNum = 1;  
  •     private MessageExecutor executor; //message listener  
  •     private ExecutorService threadPool;  
  •       
  •     private ConsumerConnector connector;  
  •       
  •     private Charset charset = Charset.forName("utf8");  
  •   
  •     public void setGroupid(String groupid) {  
  •         this.groupid = groupid;  
  •     }  
  •   
  •     public void setZkConnect(String zkConnect) {  
  •         this.zkConnect = zkConnect;  
  •     }  
  •   
  •     public void setLocation(String location) {  
  •         this.location = location;  
  •     }  
  •   
  •     public void setTopic(String topic) {  
  •         this.topic = topic;  
  •     }  
  •   
  •     public void setPartitionsNum(int partitionsNum) {  
  •         this.partitionsNum = partitionsNum;  
  •     }  
  •   
  •     public void setExecutor(MessageExecutor executor) {  
  •         this.executor = executor;  
  •     }  
  •   
  •     public KafkaConsumerClient() {}  
  •   
  •     //init consumer,and start connection and listener  
  •     public void init() throws Exception {  
  •         if(executor == null){  
  •             throw new RuntimeException("KafkaConsumer,exectuor cant be null!");  
  •         }  
  •         Properties properties = new Properties();  
  •         properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));  
  •           
  •         if(groupid != null){  
  •             properties.put("groupid", groupid);  
  •         }  
  •         if(zkConnect != null){  
  •             properties.put("zookeeper.connect", zkConnect);  
  •         }  
  •         ConsumerConfig config = new ConsumerConfig(properties);  
  •   
  •         connector = Consumer.createJavaConsumerConnector(config);  
  •         Map<String, Integer> topics = new HashMap<String, Integer>();  
  •         topics.put(topic, partitionsNum);  
  •         Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);  
  •         List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);  
  •         threadPool = Executors.newFixedThreadPool(partitionsNum * 2);  
  •           
  •         //start  
  •         for (KafkaStream<byte[], byte[]> partition : partitions) {  
  •             threadPool.execute(new MessageRunner(partition));  
  •         }  
  •     }  
  •   
  •     public void close() {  
  •         try {  
  •             threadPool.shutdownNow();  
  •         } catch (Exception e) {  
  •             //  
  •         } finally {  
  •             connector.shutdown();  
  •         }  
  •   
  •     }  
  •   
  •     class MessageRunner implements Runnable {  
  •         private KafkaStream<byte[], byte[]> partition;  
  •   
  •         MessageRunner(KafkaStream<byte[], byte[]> partition) {  
  •             this.partition = partition;  
  •         }  
  •   
  •         public void run() {  
  •             ConsumerIterator<byte[], byte[]> it = partition.iterator();  
  •             while (it.hasNext()) {  
  •                 // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用  
  •                 MessageAndMetadata<byte[], byte[]> item = it.next();  
  •                 try{  
  •                     executor.execute(new String(item.message(),charset));// UTF-8,注意异常  
  •                 }catch(Exception e){  
  •                     //  
  •                 }  
  •             }  
  •         }  
  •           
  •         public String getContent(Message message){  
  •             ByteBuffer buffer = message.payload();  
  •             if (buffer.remaining() == 0) {  
  •                 return null;  
  •             }  
  •             CharBuffer charBuffer = charset.decode(buffer);  
  •             return charBuffer.toString();  
  •         }  
  •     }  
  •   
  •     public static interface MessageExecutor {  
  •   
  •         public void execute(String message);  
  •     }  
  •   
  •     /** 
  •      * @param args 
  •      */  
  •     public static void main(String[] args) {  
  •         KafkaConsumerClient consumer = null;  
  •         try {  
  •             MessageExecutor executor = new MessageExecutor() {  
  •   
  •                 public void execute(String message) {  
  •                     System.out.println(message);  
  •                 }  
  •             };  
  •             consumer = new KafkaConsumerClient();  
  •               
  •             consumer.setTopic("test-topic");  
  •             consumer.setPartitionsNum(2);  
  •             consumer.setExecutor(executor);  
  •             consumer.init();  
  •         } catch (Exception e) {  
  •             e.printStackTrace();  
  •         } finally {  
  •              if(consumer != null){  
  •                  consumer.close();  
  •              }  
  •         }  
  •   
  •     }  
  •   
  • }  

3) spring配置(略)
 
    需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379938-1-1.html 上篇帖子: 【Kafka四】Kakfa伪分布式安装 下篇帖子: kafka client端 producer
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表