kafka spring 实例
使用定时器发送后结果如下kafka
15.安装kafka
cd /usr/local/
wget http://mirror.bit.edu.cn/apache/ ... a_2.10-0.10.0.0.tgz
tar xf kafka_2.10-0.10.0.0.tgz
ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
/usr/local/zookeeper/bin/zkCli.sh
create /kafka ''
vim /usr/local/kafka/config/server.properties
broker.id=0
zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka
scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/
scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/
scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties
scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties
master slave 启动
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
创建topic
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic
安装完成 后测试 下载
productor
consumer
spring 接受信息
代码部分
applicationContext-kafka-productor.xml
Java代码 下载
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/s ... tegration-kafka.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/s ... ing-integration.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
3600000
5
kafka.serializer.StringEncoder
1
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapterProductor"
kafka-producer-context-ref="producerContext"
auto-startup="true"
channel="pChannel"
order="3">
<int-kafka:producer-configuration
broker-list="172.23.27.120:9092,172.23.27.115:9092,172.23.27.116:9092"
key-serializer="stringSerializer"
value-class-type="java.lang.String"
value-serializer="stringSerializer"
topic="baoy-topic" />
applicationContext-kafka-consumer.xml
Java代码
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/s ... tegration-kafka.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/s ... ing-integration.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
<int-kafka:inbound-channel-adapter
id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="cChannel">
<bean id="consumerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
smallest
10485760
5242880
1000
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
consumer-properties="consumerProperties">
<int-kafka:consumer-configuration
group-id="default" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
max-messages="5000">
KafkaConsumerService
Java代码 下载
@Service
public class KafkaConsumerService {
public void process(Map<string, map> msgs) {
for (Map.Entry<string, map> entry : msgs.entrySet()) {
System.out.println("======================================Consumer Message received: ");
System.out.println("=====================================Suchit Topic:" + entry.getKey());
for (String msg : entry.getValue().values()) {
System.out.println("================================Suchit Consumed Message: " + msg);
}
}
}
}
KafkaProductorService
@Service
ublic class KafkaProductorService {
@Autowired
@Qualifier("pChannel")
private MessageChannel messageChannel;
public void sendInfo(String topic, Object obj) {
System.out.println("---Service:KafkaService------sendInfo------");
messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build());
}
pom
Java代码 下载
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
com.curiousby.baoyou.cn
SpringKafkaDEMO
war
0.0.1-SNAPSHOT
SpringKafkaDEMO Maven Webapp
http://maven.apache.org
4.2.5.RELEASE
junit
junit
4.7
jar
test
org.dbunit
dbunit
2.4.9
test
com.github.springtestdbunit
spring-test-dbunit
1.1.0
test
org.springframework
spring-test
${spring.version}
test
javax.servlet
javax.servlet-api
3.1.0
provided
org.aspectj
aspectjrt
1.7.2
org.aspectj
aspectjweaver
1.7.2
org.springframework
spring-aspects
${spring.version}
jar
org.springframework
spring-core
${spring.version}
org.springframework
spring-web
${spring.version}
org.springframework
spring-webmvc
${spring.version}
org.springframework.integration
spring-integration-kafka
1.3.0.RELEASE
commons-logging
commons-logging
1.1.1
org.slf4j
slf4j-api
1.6.4
jar
org.slf4j
slf4j-log4j12
1.6.4
jar
下载
javax
javaee-api
7.0
com.fasterxml.jackson.core
jackson-core
2.7.6
com.fasterxml.jackson.core
jackson-databind
2.7.6
com.fasterxml.jackson.core
jackson-annotations
2.7.6
org.apache.avro
avro
1.7.7
SpringKafkaDEMO
org.apache.maven.plugins
maven-compiler-plugin
3.3
org.codehaus.plexus
plexus-compiler-javac
2.5
1.7
1.7
UTF-8
${java.home}/lib/rt.jar:${java.home}/lib/jce.jar
遇到的问题:下载地址
1. spring 中 日志 中的 logback必须 保持一致 ,这里我使用 org.slf4j 1.6.4
org.slf4j
slf4j-api
1.6.4
jar
org.slf4j
slf4j-log4j12
1.6.4
jar
页:
[1]