|
我使用的kafka版本 kafka_2.8.0-0.8.1.1.tgz
参考了官网手册http://kafka.apache.org/documentation.html#quickstart
和http://blog.csdn.net/hxpjava1/article/details/19160665 版本低一下,里面有些代码不兼容
- 下载kafka 地址http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1
2.启动服务
首先要启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafaka
bin/kafka-server-start.sh config/server.properties &
3.创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看是否创建成功
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.发送消息
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("metadata.broker.list", "test.kafka.com:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "key", "测试");
producer.send(data);
producer.close();
System.out.println("结束");
}
}
5.接收消息
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class ConsumerSample {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("group.id", "test-consumer-group");
props.put("zookeeper.connect", "test.kafka.com:2181");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test", 4);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) {
System.out.println("topic:"+msgAndMetadata.topic());
String tmp = new String(msgAndMetadata.message());
System.out.println("message key: " + new String(msgAndMetadata.key()));
System.out.println("message content: " + tmp);
}
}
});
}
}
}
6.注意的地方
test.kafka.com 为域名映射,可以自己映射到自己的kafka的ip地址
如果发送消息失败 看下防火墙是否关闭
对于group.id可以查看config/consumer.properties的配置
7.如果出现FailedToSendMessageException: Failed to send messages after 3 tries错误
修改config/server.properties 链接zookeeper为
zookeeper.connect=127.0.0.1:2181
配置的时候最好通过域名映射添加topic
8.maven配置文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.0.Final</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
</dependencies>
|
|
|