kafka安装测试过程
kafka的性能在此不再赘述,百度一下很多,在此描述一下kafka的安装和测试过程:
安装kafka:
#tar -xzf kafka_2.9.2-0.8.1.tgz
#cd kafka_2.9.2-0.8.1
#mv kafka_2.9.2-0.8.1 kafka
开启zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
开启kafka服务:
bin/kafka-server-start.sh config/server.properties
创建话题topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
具体kafka-topics.sh 的参数自行查看--help帮助
查看kafka服务中的topics:
bin/kafka-topics.sh --list --zookeeper localhost:2181
#列出topics如下
test
在2.8之前的版本中的shell脚本可能不同
打开produce,向test话题添加消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
xxxxxxxxxxxxxxxxx #输入内容后enter即可发送出消息内容
打开customer读取test话题内容:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
xxxxxxxxxxxxxxxxx
kafka的是scala语言编写的服务框架,因此用scala开发produce和custome应用程序应该是非常方便的,但是没有找到相应examples,但kafka也支持java和python以及c编写的客户端应用程序,下面分享一下java的代码片段(网络转载):
- 消费者custome:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerTest extends Thread {
private final ConsumerConnector consumer;
private final String topic;
public static void main(String[] args) {
ConsumerTest consumerThread = new ConsumerTest("1test");
consumerThread.start();
}
public ConsumerTest(String topic) {
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "master:2181");
props.put("group.id", "0");
props.put("zookeeper.session.timeout.ms", "400000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
}
}
消息的生产者produce:
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "master:2181"); // zookeeper的一个节点地址
props.put("serializer.class", "kafka.serializer.StringEncoder");// kafka序列化方式
props.put("metadata.broker.list", "master:9092");
props.put("request.required.acks", "1");
//props.put("partitioner.class", "com.xq.SimplePartitioner");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String msg ="this is a messageuuu! XXXmessDageuuu";
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", msg);
for(int i = 0 ; i < 5; i ++){
System.out.println("send"+i);
producer.send(data);
}
producer.close();
}
}
分别运行custom和produce即可看到控制台消息发送和接受的内容。
- 后续将继续更新kafka的各个参数的说明文档以及与spark集成,与flume集成。
|