|
使用定时器发送后 结果如下
kafka 代码下载
Java代码
- 15.安装kafka
- cd /usr/local/
- wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
- tar xf kafka_2.10-0.10.0.0.tgz
- ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
- chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
- chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
-
- /usr/local/zookeeper/bin/zkCli.sh
- create /kafka ''
-
- vim /usr/local/kafka/config/server.properties
- broker.id=0
- zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka
-
- scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/
- scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/
-
- scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties
- scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties
-
- master slave 启动
- /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
- 创建topic
- /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic
- /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic
-
-
- /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic
-
- /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic
安装完成 后测试 下载
productor
consumer
spring 接受信息
代码部分
applicationContext-kafka-productor.xml
Java代码 下载
-
-
-
-
-
-
-
-
-
-
-
- 3600000
- 5
- kafka.serializer.StringEncoder
- 1
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
applicationContext-kafka-consumer.xml
Java代码
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- smallest
- 10485760
- 5242880
- 1000
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
KafkaConsumerService
Java代码 下载
- @Service
- public class KafkaConsumerService {
-
-
- public void process(Map msgs) {
- for (Map.Entry entry : msgs.entrySet()) {
- System.out.println("======================================Consumer Message received: ");
- System.out.println("=====================================Suchit Topic:" + entry.getKey());
- for (String msg : entry.getValue().values()) {
- System.out.println("================================Suchit Consumed Message: " + msg);
- }
- }
- }
-
- }
KafkaProductorService
Java代码
- @Service
- ublic class KafkaProductorService {
-
-
- @Autowired
- @Qualifier("pChannel")
- private MessageChannel messageChannel;
-
-
- public void sendInfo(String topic, Object obj) {
- System.out.println("---Service:KafkaService------sendInfo------");
- messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build());
- }
-
pom
Java代码 下载
-
- 4.0.0
- com.curiousby.baoyou.cn
- SpringKafkaDEMO
- war
- 0.0.1-SNAPSHOT
- SpringKafkaDEMO Maven Webapp
- http://maven.apache.org
-
-
-
-
- 4.2.5.RELEASE
-
-
-
-
-
- junit
- junit
- 4.7
- jar
- test
-
-
- org.dbunit
- dbunit
- 2.4.9
- test
-
-
- com.github.springtestdbunit
- spring-test-dbunit
- 1.1.0
- test
-
-
- org.springframework
- spring-test
- ${spring.version}
- test
-
-
-
-
- javax.servlet
- javax.servlet-api
- 3.1.0
- provided
-
-
- org.aspectj
- aspectjrt
- 1.7.2
-
-
- org.aspectj
- aspectjweaver
- 1.7.2
-
-
- org.springframework
- spring-aspects
- ${spring.version}
- jar
-
-
- org.springframework
- spring-core
- ${spring.version}
-
-
- org.springframework
- spring-web
- ${spring.version}
-
-
- org.springframework
- spring-webmvc
- ${spring.version}
-
-
- org.springframework.integration
- spring-integration-kafka
- 1.3.0.RELEASE
-
-
- commons-logging
- commons-logging
- 1.1.1
-
-
- org.slf4j
- slf4j-api
- 1.6.4
- jar
-
-
- org.slf4j
- slf4j-log4j12
- 1.6.4
- jar
- 下载
-
- javax
- javaee-api
- 7.0
-
-
- com.fasterxml.jackson.core
- jackson-core
- 2.7.6
-
-
- com.fasterxml.jackson.core
- jackson-databind
- 2.7.6
-
-
- com.fasterxml.jackson.core
- jackson-annotations
- 2.7.6
-
-
-
- org.apache.avro
- avro
- 1.7.7
-
-
-
-
- SpringKafkaDEMO
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.3
-
-
- org.codehaus.plexus
- plexus-compiler-javac
- 2.5
-
-
-
- 1.7
- 1.7
- UTF-8
-
-
- ${java.home}/lib/rt.jar:${java.home}/lib/jce.jar
-
-
-
-
-
-
遇到的问题:下载地址
1. spring 中 日志 中的 logback 必须 保持一致 ,这里我使用 org.slf4j 1.6.4
Java代码
- org.slf4j
- slf4j-api
- 1.6.4
- jar
-
-
- org.slf4j
- slf4j-log4j12
- 1.6.4
- jar
-
|
|
|