(转)Kafka部署与代码实例
转自:http://shift-alt-ctrl.iteye.com/blog/1930791kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
其中kafka为0.8V,zookeeper为3.4.5V
一.Zookeeper集群构建
我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.
1) zk-0
调整配置文件:
Php代码
[*]clientPort=2181
[*]server.0=127.0.0.1:2888:3888
[*]server.1=127.0.0.1:2889:3889
[*]server.2=127.0.0.1:2890:3890
[*]##只需要修改上述配置,其他配置保留默认值
启动zookeeper
Java代码
[*]./zkServer.sh start
2) zk-1
调整配置文件(其他配置和zk-0一只):
Php代码
[*]clientPort=2182
[*]##只需要修改上述配置,其他配置保留默认值
启动zookeeper
Java代码
[*]./zkServer.sh start
3) zk-2
调整配置文件(其他配置和zk-0一只):
Php代码
[*]clientPort=2183
[*]##只需要修改上述配置,其他配置保留默认值
启动zookeeper
Java代码
[*]./zkServer.sh start
二. Kafka集群构建
因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
1) kafka-0
在config目录下修改配置文件为:
Java代码
[*]broker.id=0
[*]port=9092
[*]num.network.threads=2
[*]num.io.threads=2
[*]socket.send.buffer.bytes=1048576
[*]socket.receive.buffer.bytes=1048576
[*]socket.request.max.bytes=104857600
[*]log.dir=./logs
[*]num.partitions=2
[*]log.flush.interval.messages=10000
[*]log.flush.interval.ms=1000
[*]log.retention.hours=168
[*]#log.retention.bytes=1073741824
[*]log.segment.bytes=536870912
[*]##replication机制,让每个topic的partitions在kafka-cluster中备份2个
[*]##用来提高cluster的容错能力..
[*]default.replication.factor=1
[*]log.cleanup.interval.mins=10
[*]zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
[*]zookeeper.connection.timeout.ms=1000000
因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
Java代码
[*]> cd kafka-0
[*]> ./sbt update
[*]> ./sbt package
[*]> ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
Java代码
[*]> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
2) kafka-1
Java代码
[*]broker.id=1
[*]port=9093
[*]##其他配置和kafka-0保持一致
然后和kafka-0一样执行打包命令,然后启动此broker.
Java代码
[*]> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
仍然可以通过如下指令查看topic的"partition"/"replicas"的分布和存活情况.
Java代码
[*]> bin/kafka-list-topic.sh --zookeeper localhost:2181
[*]topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2
[*]topic: test partition: 0 leader: 0 replicas: 0 isr: 0
到目前为止环境已经OK了,那我们就开始展示编程实例吧。[配置参数详解]
三.项目准备
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
Java代码
[*]<dependencies>
[*] <dependency>
[*] <groupId>log4j</groupId>
[*] <artifactId>log4j</artifactId>
[*] <version>1.2.14</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>org.apache.kafka</groupId>
[*] <artifactId>kafka_2.8.2</artifactId>
[*] <version>0.8.0</version>
[*] <exclusions>
[*] <exclusion>
[*] <groupId>log4j</groupId>
[*] <artifactId>log4j</artifactId>
[*] </exclusion>
[*] </exclusions>
[*] </dependency>
[*] <dependency>
[*] <groupId>org.scala-lang</groupId>
[*] <artifactId>scala-library</artifactId>
[*] <version>2.8.2</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>com.yammer.metrics</groupId>
[*] <artifactId>metrics-core</artifactId>
[*] <version>2.2.0</version>
[*] </dependency>
[*] <dependency>
[*] <groupId>com.101tec</groupId>
[*] <artifactId>zkclient</artifactId>
[*] <version>0.3</version>
[*] </dependency>
[*]</dependencies>
四.Producer端代码
1) producer.properties文件:此文件放在/resources目录下
Java代码
[*]#partitioner.class=
[*]##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata
[*]##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来
[*]##此值,我们可以在spring中注入过来
[*]##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
[*]##,127.0.0.1:9093
[*]##同步,建议为async
[*]producer.type=sync
[*]compression.codec=0
[*]serializer.class=kafka.serializer.StringEncoder
[*]##在producer.type=async时有效
[*]#batch.num.messages=100
2) KafkaProducerClient.java代码样例
Java代码
[*]import java.util.ArrayList;
[*]import java.util.Collection;
[*]import java.util.List;
[*]import java.util.Properties;
[*]
[*]import kafka.javaapi.producer.Producer;
[*]import kafka.producer.KeyedMessage;
[*]import kafka.producer.ProducerConfig;
[*]
[*]/**
[*] * User: guanqing-liu
[*] */
[*]public class KafkaProducerClient {
[*]
[*] private Producer<String, String> inner;
[*]
[*] private String brokerList;//for metadata discovery,spring setter
[*] private String location = "kafka-producer.properties";//spring setter
[*]
[*] private String defaultTopic;//spring setter
[*]
[*] public void setBrokerList(String brokerList) {
[*] this.brokerList = brokerList;
[*] }
[*]
[*] public void setLocation(String location) {
[*] this.location = location;
[*] }
[*]
[*] public void setDefaultTopic(String defaultTopic) {
[*] this.defaultTopic = defaultTopic;
[*] }
[*]
[*] public KafkaProducerClient(){}
[*]
[*] public void init() throws Exception {
[*] Properties properties = new Properties();
[*] properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
[*]
[*]
[*] if(brokerList != null) {
[*] properties.put("metadata.broker.list", brokerList);
[*] }
[*]
[*] ProducerConfig config = new ProducerConfig(properties);
[*] inner = new Producer<String, String>(config);
[*] }
[*]
[*] public void send(String message){
[*] send(defaultTopic,message);
[*] }
[*]
[*] public void send(Collection<String> messages){
[*] send(defaultTopic,messages);
[*] }
[*]
[*] public void send(String topicName, String message) {
[*] if (topicName == null || message == null) {
[*] return;
[*] }
[*] KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
[*] inner.send(km);
[*] }
[*]
[*] public void send(String topicName, Collection<String> messages) {
[*] if (topicName == null || messages == null) {
[*] return;
[*] }
[*] if (messages.isEmpty()) {
[*] return;
[*] }
[*] List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
[*] int i= 0;
[*] for (String entry : messages) {
[*] KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
[*] kms.add(km);
[*] i++;
[*] if(i % 20 == 0){
[*] inner.send(kms);
[*] kms.clear();
[*] }
[*] }
[*]
[*] if(!kms.isEmpty()){
[*] inner.send(kms);
[*] }
[*] }
[*]
[*] public void close() {
[*] inner.close();
[*] }
[*]
[*] /**
[*] * @param args
[*] */
[*] public static void main(String[] args) {
[*] KafkaProducerClient producer = null;
[*] try {
[*] producer = new KafkaProducerClient();
[*] //producer.setBrokerList("");
[*] int i = 0;
[*] while (true) {
[*] producer.send("test-topic", "this is a sample" + i);
[*] i++;
[*] Thread.sleep(2000);
[*] }
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] } finally {
[*] if (producer != null) {
[*] producer.close();
[*] }
[*] }
[*]
[*] }
[*]
[*]}
3) spring配置
Java代码
[*]<bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close">
[*] <property name="zkConnect" value="${zookeeper_cluster}"></property>
[*] <property name="defaultTopic" value="${kafka_topic}"></property>
[*]</bean>
五.Consumer端
1) consumer.properties:文件位于/resources目录下
Java代码
[*]## 此值可以配置,也可以通过spring注入
[*]##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
[*]##,127.0.0.1:2182,127.0.0.1:2183
[*]# timeout in ms for connecting to zookeeper
[*]zookeeper.connectiontimeout.ms=1000000
[*]#consumer group id
[*]group.id=test-group
[*]#consumer timeout
[*]#consumer.timeout.ms=5000
[*]auto.commit.enable=true
[*]auto.commit.interval.ms=60000
2) KafkaConsumerClient.java代码样例
Java代码
[*]package com.test.kafka;
[*]import java.nio.ByteBuffer;
[*]import java.nio.CharBuffer;
[*]import java.nio.charset.Charset;
[*]import java.util.HashMap;
[*]import java.util.List;
[*]import java.util.Map;
[*]import java.util.Properties;
[*]import java.util.concurrent.ExecutorService;
[*]import java.util.concurrent.Executors;
[*]
[*]import kafka.consumer.Consumer;
[*]import kafka.consumer.ConsumerConfig;
[*]import kafka.consumer.ConsumerIterator;
[*]import kafka.consumer.KafkaStream;
[*]import kafka.javaapi.consumer.ConsumerConnector;
[*]import kafka.message.Message;
[*]import kafka.message.MessageAndMetadata;
[*]
[*]/**
[*] * User: guanqing-liu
[*] */
[*]public class KafkaConsumerClient {
[*]
[*] private String groupid; //can be setting by spring
[*] private String zkConnect;//can be setting by spring
[*] private String location = "kafka-consumer.properties";//配置文件位置
[*] private String topic;
[*] private int partitionsNum = 1;
[*] private MessageExecutor executor; //message listener
[*] private ExecutorService threadPool;
[*]
[*] private ConsumerConnector connector;
[*]
[*] private Charset charset = Charset.forName("utf8");
[*]
[*] public void setGroupid(String groupid) {
[*] this.groupid = groupid;
[*] }
[*]
[*] public void setZkConnect(String zkConnect) {
[*] this.zkConnect = zkConnect;
[*] }
[*]
[*] public void setLocation(String location) {
[*] this.location = location;
[*] }
[*]
[*] public void setTopic(String topic) {
[*] this.topic = topic;
[*] }
[*]
[*] public void setPartitionsNum(int partitionsNum) {
[*] this.partitionsNum = partitionsNum;
[*] }
[*]
[*] public void setExecutor(MessageExecutor executor) {
[*] this.executor = executor;
[*] }
[*]
[*] public KafkaConsumerClient() {}
[*]
[*] //init consumer,and start connection and listener
[*] public void init() throws Exception {
[*] if(executor == null){
[*] throw new RuntimeException("KafkaConsumer,exectuor cant be null!");
[*] }
[*] Properties properties = new Properties();
[*] properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
[*]
[*] if(groupid != null){
[*] properties.put("groupid", groupid);
[*] }
[*] if(zkConnect != null){
[*] properties.put("zookeeper.connect", zkConnect);
[*] }
[*] ConsumerConfig config = new ConsumerConfig(properties);
[*]
[*] connector = Consumer.createJavaConsumerConnector(config);
[*] Map<String, Integer> topics = new HashMap<String, Integer>();
[*] topics.put(topic, partitionsNum);
[*] Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
[*] List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
[*] threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
[*]
[*] //start
[*] for (KafkaStream<byte[], byte[]> partition : partitions) {
[*] threadPool.execute(new MessageRunner(partition));
[*] }
[*] }
[*]
[*] public void close() {
[*] try {
[*] threadPool.shutdownNow();
[*] } catch (Exception e) {
[*] //
[*] } finally {
[*] connector.shutdown();
[*] }
[*]
[*] }
[*]
[*] class MessageRunner implements Runnable {
[*] private KafkaStream<byte[], byte[]> partition;
[*]
[*] MessageRunner(KafkaStream<byte[], byte[]> partition) {
[*] this.partition = partition;
[*] }
[*]
[*] public void run() {
[*] ConsumerIterator<byte[], byte[]> it = partition.iterator();
[*] while (it.hasNext()) {
[*] // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用
[*] MessageAndMetadata<byte[], byte[]> item = it.next();
[*] try{
[*] executor.execute(new String(item.message(),charset));// UTF-8,注意异常
[*] }catch(Exception e){
[*] //
[*] }
[*] }
[*] }
[*]
[*] public String getContent(Message message){
[*] ByteBuffer buffer = message.payload();
[*] if (buffer.remaining() == 0) {
[*] return null;
[*] }
[*] CharBuffer charBuffer = charset.decode(buffer);
[*] return charBuffer.toString();
[*] }
[*] }
[*]
[*] public static interface MessageExecutor {
[*]
[*] public void execute(String message);
[*] }
[*]
[*] /**
[*] * @param args
[*] */
[*] public static void main(String[] args) {
[*] KafkaConsumerClient consumer = null;
[*] try {
[*] MessageExecutor executor = new MessageExecutor() {
[*]
[*] public void execute(String message) {
[*] System.out.println(message);
[*] }
[*] };
[*] consumer = new KafkaConsumerClient();
[*]
[*] consumer.setTopic("test-topic");
[*] consumer.setPartitionsNum(2);
[*] consumer.setExecutor(executor);
[*] consumer.init();
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] } finally {
[*] if(consumer != null){
[*] consumer.close();
[*] }
[*] }
[*]
[*] }
[*]
[*]}
3) spring配置(略)
需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。
[*]test-kafka.zip (5.3 KB)
[*]下载次数: 57
页:
[1]