jingjihui 发表于 2017-5-23 17:49:19

(转)Kafka部署与代码实例

  转自:http://shift-alt-ctrl.iteye.com/blog/1930791
  kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
  我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
  其中kafka为0.8V,zookeeper为3.4.5V
  一.Zookeeper集群构建
  我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.
  1) zk-0
  调整配置文件:
Php代码  


[*]clientPort=2181  
[*]server.0=127.0.0.1:2888:3888  
[*]server.1=127.0.0.1:2889:3889  
[*]server.2=127.0.0.1:2890:3890  
[*]##只需要修改上述配置,其他配置保留默认值  

  启动zookeeper
Java代码  


[*]./zkServer.sh start  

  2) zk-1
  调整配置文件(其他配置和zk-0一只):
Php代码  


[*]clientPort=2182  
[*]##只需要修改上述配置,其他配置保留默认值  

  启动zookeeper
Java代码  


[*]./zkServer.sh start  

  3) zk-2
  调整配置文件(其他配置和zk-0一只):
Php代码  


[*]clientPort=2183  
[*]##只需要修改上述配置,其他配置保留默认值  

  启动zookeeper
Java代码  


[*]./zkServer.sh start  

  二. Kafka集群构建
  因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
  1) kafka-0
  在config目录下修改配置文件为:
Java代码  


[*]broker.id=0  
[*]port=9092  
[*]num.network.threads=2  
[*]num.io.threads=2  
[*]socket.send.buffer.bytes=1048576  
[*]socket.receive.buffer.bytes=1048576  
[*]socket.request.max.bytes=104857600  
[*]log.dir=./logs  
[*]num.partitions=2  
[*]log.flush.interval.messages=10000  
[*]log.flush.interval.ms=1000  
[*]log.retention.hours=168  
[*]#log.retention.bytes=1073741824  
[*]log.segment.bytes=536870912  
[*]##replication机制,让每个topic的partitions在kafka-cluster中备份2个  
[*]##用来提高cluster的容错能力..  
[*]default.replication.factor=1  
[*]log.cleanup.interval.mins=10  
[*]zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
[*]zookeeper.connection.timeout.ms=1000000  

  因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
Java代码  


[*]> cd kafka-0  
[*]> ./sbt update  
[*]> ./sbt package  
[*]> ./sbt assembly-package-dependency   

  其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
Java代码  


[*]> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

  因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
  2) kafka-1
Java代码  


[*]broker.id=1  
[*]port=9093  
[*]##其他配置和kafka-0保持一致  

  然后和kafka-0一样执行打包命令,然后启动此broker.
Java代码  


[*]> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &  

  仍然可以通过如下指令查看topic的"partition"/"replicas"的分布和存活情况.
Java代码  


[*]> bin/kafka-list-topic.sh --zookeeper localhost:2181  
[*]topic: my-replicated-topic  partition: 0    leader: 2   replicas: 1,2,0 isr: 2  
[*]topic: test partition: 0    leader: 0   replicas: 0 isr: 0   

  到目前为止环境已经OK了,那我们就开始展示编程实例吧。[配置参数详解]
  三.项目准备
  项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
Java代码  


[*]<dependencies>  
[*]    <dependency>  
[*]        <groupId>log4j</groupId>  
[*]        <artifactId>log4j</artifactId>  
[*]        <version>1.2.14</version>  
[*]    </dependency>  
[*]    <dependency>  
[*]        <groupId>org.apache.kafka</groupId>  
[*]        <artifactId>kafka_2.8.2</artifactId>  
[*]        <version>0.8.0</version>  
[*]        <exclusions>  
[*]            <exclusion>  
[*]                <groupId>log4j</groupId>  
[*]                <artifactId>log4j</artifactId>  
[*]            </exclusion>  
[*]        </exclusions>  
[*]    </dependency>  
[*]    <dependency>  
[*]        <groupId>org.scala-lang</groupId>  
[*]        <artifactId>scala-library</artifactId>  
[*]        <version>2.8.2</version>  
[*]    </dependency>  
[*]    <dependency>  
[*]        <groupId>com.yammer.metrics</groupId>  
[*]        <artifactId>metrics-core</artifactId>  
[*]        <version>2.2.0</version>  
[*]    </dependency>  
[*]    <dependency>  
[*]        <groupId>com.101tec</groupId>  
[*]        <artifactId>zkclient</artifactId>  
[*]        <version>0.3</version>  
[*]    </dependency>  
[*]</dependencies>  

  四.Producer端代码
  1) producer.properties文件:此文件放在/resources目录下
Java代码  


[*]#partitioner.class=  
[*]##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata  
[*]##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来  
[*]##此值,我们可以在spring中注入过来  
[*]##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093  
[*]##,127.0.0.1:9093  
[*]##同步,建议为async  
[*]producer.type=sync  
[*]compression.codec=0  
[*]serializer.class=kafka.serializer.StringEncoder  
[*]##在producer.type=async时有效  
[*]#batch.num.messages=100  

  2) KafkaProducerClient.java代码样例
Java代码  


[*]import java.util.ArrayList;  
[*]import java.util.Collection;  
[*]import java.util.List;  
[*]import java.util.Properties;  
[*]  
[*]import kafka.javaapi.producer.Producer;  
[*]import kafka.producer.KeyedMessage;  
[*]import kafka.producer.ProducerConfig;  
[*]  
[*]/** 
[*] * User: guanqing-liu 
[*] */  
[*]public class KafkaProducerClient {  
[*]  
[*]    private Producer<String, String> inner;  
[*]      
[*]    private String brokerList;//for metadata discovery,spring setter  
[*]    private String location = "kafka-producer.properties";//spring setter  
[*]      
[*]    private String defaultTopic;//spring setter  
[*]  
[*]    public void setBrokerList(String brokerList) {  
[*]        this.brokerList = brokerList;  
[*]    }  
[*]  
[*]    public void setLocation(String location) {  
[*]        this.location = location;  
[*]    }  
[*]  
[*]    public void setDefaultTopic(String defaultTopic) {  
[*]        this.defaultTopic = defaultTopic;  
[*]    }  
[*]  
[*]    public KafkaProducerClient(){}  
[*]      
[*]    public void init() throws Exception {  
[*]        Properties properties = new Properties();  
[*]        properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));  
[*]          
[*]          
[*]        if(brokerList != null) {  
[*]            properties.put("metadata.broker.list", brokerList);  
[*]        }  
[*]  
[*]        ProducerConfig config = new ProducerConfig(properties);  
[*]        inner = new Producer<String, String>(config);  
[*]    }  
[*]  
[*]    public void send(String message){  
[*]        send(defaultTopic,message);  
[*]    }  
[*]      
[*]    public void send(Collection<String> messages){  
[*]        send(defaultTopic,messages);  
[*]    }  
[*]      
[*]    public void send(String topicName, String message) {  
[*]        if (topicName == null || message == null) {  
[*]            return;  
[*]        }  
[*]        KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
[*]        inner.send(km);  
[*]    }  
[*]  
[*]    public void send(String topicName, Collection<String> messages) {  
[*]        if (topicName == null || messages == null) {  
[*]            return;  
[*]        }  
[*]        if (messages.isEmpty()) {  
[*]            return;  
[*]        }  
[*]        List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
[*]        int i= 0;  
[*]        for (String entry : messages) {  
[*]            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
[*]            kms.add(km);  
[*]            i++;  
[*]            if(i % 20 == 0){  
[*]                inner.send(kms);  
[*]                kms.clear();  
[*]            }  
[*]        }  
[*]          
[*]        if(!kms.isEmpty()){  
[*]            inner.send(kms);  
[*]        }  
[*]    }  
[*]  
[*]    public void close() {  
[*]        inner.close();  
[*]    }  
[*]  
[*]    /** 
[*]     * @param args 
[*]     */  
[*]    public static void main(String[] args) {  
[*]        KafkaProducerClient producer = null;  
[*]        try {  
[*]            producer = new KafkaProducerClient();  
[*]            //producer.setBrokerList("");  
[*]            int i = 0;  
[*]            while (true) {  
[*]                producer.send("test-topic", "this is a sample" + i);  
[*]                i++;  
[*]                Thread.sleep(2000);  
[*]            }  
[*]        } catch (Exception e) {  
[*]            e.printStackTrace();  
[*]        } finally {  
[*]            if (producer != null) {  
[*]                producer.close();  
[*]            }  
[*]        }  
[*]  
[*]    }  
[*]  
[*]}  

  3) spring配置
Java代码  


[*]<bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close">  
[*]    <property name="zkConnect" value="${zookeeper_cluster}"></property>  
[*]    <property name="defaultTopic" value="${kafka_topic}"></property>  
[*]</bean>  

  五.Consumer端
  1) consumer.properties:文件位于/resources目录下
Java代码  


[*]## 此值可以配置,也可以通过spring注入  
[*]##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
[*]##,127.0.0.1:2182,127.0.0.1:2183  
[*]# timeout in ms for connecting to zookeeper  
[*]zookeeper.connectiontimeout.ms=1000000  
[*]#consumer group id  
[*]group.id=test-group  
[*]#consumer timeout  
[*]#consumer.timeout.ms=5000  
[*]auto.commit.enable=true  
[*]auto.commit.interval.ms=60000  

  2) KafkaConsumerClient.java代码样例
Java代码  


[*]package com.test.kafka;  
[*]import java.nio.ByteBuffer;  
[*]import java.nio.CharBuffer;  
[*]import java.nio.charset.Charset;  
[*]import java.util.HashMap;  
[*]import java.util.List;  
[*]import java.util.Map;  
[*]import java.util.Properties;  
[*]import java.util.concurrent.ExecutorService;  
[*]import java.util.concurrent.Executors;  
[*]  
[*]import kafka.consumer.Consumer;  
[*]import kafka.consumer.ConsumerConfig;  
[*]import kafka.consumer.ConsumerIterator;  
[*]import kafka.consumer.KafkaStream;  
[*]import kafka.javaapi.consumer.ConsumerConnector;  
[*]import kafka.message.Message;  
[*]import kafka.message.MessageAndMetadata;  
[*]  
[*]/** 
[*] * User: guanqing-liu  
[*] */  
[*]public class KafkaConsumerClient {  
[*]  
[*]    private String groupid; //can be setting by spring  
[*]    private String zkConnect;//can be setting by spring  
[*]    private String location = "kafka-consumer.properties";//配置文件位置  
[*]    private String topic;  
[*]    private int partitionsNum = 1;  
[*]    private MessageExecutor executor; //message listener  
[*]    private ExecutorService threadPool;  
[*]      
[*]    private ConsumerConnector connector;  
[*]      
[*]    private Charset charset = Charset.forName("utf8");  
[*]  
[*]    public void setGroupid(String groupid) {  
[*]        this.groupid = groupid;  
[*]    }  
[*]  
[*]    public void setZkConnect(String zkConnect) {  
[*]        this.zkConnect = zkConnect;  
[*]    }  
[*]  
[*]    public void setLocation(String location) {  
[*]        this.location = location;  
[*]    }  
[*]  
[*]    public void setTopic(String topic) {  
[*]        this.topic = topic;  
[*]    }  
[*]  
[*]    public void setPartitionsNum(int partitionsNum) {  
[*]        this.partitionsNum = partitionsNum;  
[*]    }  
[*]  
[*]    public void setExecutor(MessageExecutor executor) {  
[*]        this.executor = executor;  
[*]    }  
[*]  
[*]    public KafkaConsumerClient() {}  
[*]  
[*]    //init consumer,and start connection and listener  
[*]    public void init() throws Exception {  
[*]        if(executor == null){  
[*]            throw new RuntimeException("KafkaConsumer,exectuor cant be null!");  
[*]        }  
[*]        Properties properties = new Properties();  
[*]        properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));  
[*]          
[*]        if(groupid != null){  
[*]            properties.put("groupid", groupid);  
[*]        }  
[*]        if(zkConnect != null){  
[*]            properties.put("zookeeper.connect", zkConnect);  
[*]        }  
[*]        ConsumerConfig config = new ConsumerConfig(properties);  
[*]  
[*]        connector = Consumer.createJavaConsumerConnector(config);  
[*]        Map<String, Integer> topics = new HashMap<String, Integer>();  
[*]        topics.put(topic, partitionsNum);  
[*]        Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);  
[*]        List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);  
[*]        threadPool = Executors.newFixedThreadPool(partitionsNum * 2);  
[*]          
[*]        //start  
[*]        for (KafkaStream<byte[], byte[]> partition : partitions) {  
[*]            threadPool.execute(new MessageRunner(partition));  
[*]        }  
[*]    }  
[*]  
[*]    public void close() {  
[*]        try {  
[*]            threadPool.shutdownNow();  
[*]        } catch (Exception e) {  
[*]            //  
[*]        } finally {  
[*]            connector.shutdown();  
[*]        }  
[*]  
[*]    }  
[*]  
[*]    class MessageRunner implements Runnable {  
[*]        private KafkaStream<byte[], byte[]> partition;  
[*]  
[*]        MessageRunner(KafkaStream<byte[], byte[]> partition) {  
[*]            this.partition = partition;  
[*]        }  
[*]  
[*]        public void run() {  
[*]            ConsumerIterator<byte[], byte[]> it = partition.iterator();  
[*]            while (it.hasNext()) {  
[*]                // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用  
[*]                MessageAndMetadata<byte[], byte[]> item = it.next();  
[*]                try{  
[*]                    executor.execute(new String(item.message(),charset));// UTF-8,注意异常  
[*]                }catch(Exception e){  
[*]                    //  
[*]                }  
[*]            }  
[*]        }  
[*]          
[*]        public String getContent(Message message){  
[*]            ByteBuffer buffer = message.payload();  
[*]            if (buffer.remaining() == 0) {  
[*]                return null;  
[*]            }  
[*]            CharBuffer charBuffer = charset.decode(buffer);  
[*]            return charBuffer.toString();  
[*]        }  
[*]    }  
[*]  
[*]    public static interface MessageExecutor {  
[*]  
[*]        public void execute(String message);  
[*]    }  
[*]  
[*]    /** 
[*]     * @param args 
[*]     */  
[*]    public static void main(String[] args) {  
[*]        KafkaConsumerClient consumer = null;  
[*]        try {  
[*]            MessageExecutor executor = new MessageExecutor() {  
[*]  
[*]                public void execute(String message) {  
[*]                    System.out.println(message);  
[*]                }  
[*]            };  
[*]            consumer = new KafkaConsumerClient();  
[*]              
[*]            consumer.setTopic("test-topic");  
[*]            consumer.setPartitionsNum(2);  
[*]            consumer.setExecutor(executor);  
[*]            consumer.init();  
[*]        } catch (Exception e) {  
[*]            e.printStackTrace();  
[*]        } finally {  
[*]             if(consumer != null){  
[*]                 consumer.close();  
[*]             }  
[*]        }  
[*]  
[*]    }  
[*]  
[*]}  

  3) spring配置(略)
  需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
  在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。



[*]test-kafka.zip (5.3 KB)
[*]下载次数: 57
页: [1]
查看完整版本: (转)Kafka部署与代码实例