7564321 发表于 2016-12-19 13:17:32

kafka spring 实例

使用定时器发送后结果如下





kafka


    15.安装kafka
    cd /usr/local/
    wget http://mirror.bit.edu.cn/apache/ ... a_2.10-0.10.0.0.tgz
    tar xf kafka_2.10-0.10.0.0.tgz
    ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
    chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
    chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka

    /usr/local/zookeeper/bin/zkCli.sh
    create /kafka ''

    vim /usr/local/kafka/config/server.properties
    broker.id=0
    zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka

    scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/
    scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/

    scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties
    scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties

    master slave 启动
    /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
    创建topic
    /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic
    /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic


    /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic

    /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic



安装完成 后测试 下载
productor

consumer

spring 接受信息

代码部分
applicationContext-kafka-productor.xml
Java代码 下载

      
    <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
      xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
      xmlns:task="http://www.springframework.org/schema/task"
      xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka   
                            http://www.springframework.org/s ... tegration-kafka.xsd
                            http://www.springframework.org/schema/integration   
                            http://www.springframework.org/s ... ing-integration.xsd
                            http://www.springframework.org/schema/beans   
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/task   
                            http://www.springframework.org/schema/task/spring-task.xsd">

         
         
         
            
         
      <bean id="producerProperties"
            class="org.springframework.beans.factory.config.PropertiesFactoryBean">
            
                  
                  3600000
                  5
                  kafka.serializer.StringEncoder
                  1
                  
            
         

         

         
            
         

      <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapterProductor"   
            kafka-producer-context-ref="producerContext"   
            auto-startup="true"   
            channel="pChannel"   
            order="3">
            
         

         

         
               
                <int-kafka:producer-configuration
                  broker-list="172.23.27.120:9092,172.23.27.115:9092,172.23.27.116:9092"   
                  key-serializer="stringSerializer"
                  value-class-type="java.lang.String"   
                  value-serializer="stringSerializer"
                  topic="baoy-topic" />
            
         
      


applicationContext-kafka-consumer.xml
Java代码

      
    <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
      xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
      xmlns:task="http://www.springframework.org/schema/task"
      xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka   
                            http://www.springframework.org/s ... tegration-kafka.xsd
                            http://www.springframework.org/schema/integration   
                            http://www.springframework.org/s ... ing-integration.xsd
                            http://www.springframework.org/schema/beans   
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/task   
                            http://www.springframework.org/schema/task/spring-task.xsd">

         
         
            
         
         
      <int-kafka:zookeeper-connect id="zookeeperConnect"
            zk-connect="172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka" zk-connection-timeout="6000"
            zk-session-timeout="6000" zk-sync-time="2000" />
         
      <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
            auto-startup="true" channel="cChannel">
            
         
         
         

      <bean id="consumerProperties"
            class="org.springframework.beans.factory.config.PropertiesFactoryBean">
            
                  
                  smallest
                  10485760   
                  5242880
                  1000
                  
            
         
         
         
         
         

      <int-kafka:consumer-context id="consumerContext"
            consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
            consumer-properties="consumerProperties">
            
                <int-kafka:consumer-configuration
                  group-id="default" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
                  max-messages="5000">   
                     
                  
            
         
      


KafkaConsumerService
Java代码 下载

    @Service
    public class KafkaConsumerService {


      public void process(Map<string, map> msgs) {
            for (Map.Entry<string, map> entry : msgs.entrySet()) {
                System.out.println("======================================Consumer Message received: ");
                System.out.println("=====================================Suchit Topic:" + entry.getKey());
                for (String msg : entry.getValue().values()) {
                  System.out.println("================================Suchit Consumed Message: " + msg);
                }
            }
      }

    }


KafkaProductorService


    @Service
    ublic class KafkaProductorService {


       @Autowired
       @Qualifier("pChannel")
       private MessageChannel messageChannel;


       public void sendInfo(String topic, Object obj) {
         System.out.println("---Service:KafkaService------sendInfo------");   
         messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build());
       }



pom

Java代码 下载

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      4.0.0
      com.curiousby.baoyou.cn
      SpringKafkaDEMO
      war
      0.0.1-SNAPSHOT
      SpringKafkaDEMO Maven Webapp
      http://maven.apache.org


         
         
            4.2.5.RELEASE
         

         
            
            
                junit
                junit
                4.7
                jar
                test
            
            
                org.dbunit
                dbunit
                2.4.9
                test
            
            
                com.github.springtestdbunit
                spring-test-dbunit
                1.1.0
                test
            
               
                org.springframework
                spring-test
                ${spring.version}
                test
            


            
                javax.servlet
                javax.servlet-api
                3.1.0
                provided
            
            
                org.aspectj
                aspectjrt
                1.7.2
            
            
                org.aspectj
                aspectjweaver
                1.7.2
            
            
                org.springframework
                spring-aspects
                ${spring.version}
                jar
            
            
                org.springframework
                spring-core
                ${spring.version}
            
            
                org.springframework
                spring-web
                ${spring.version}
            
            
                org.springframework
                spring-webmvc
                ${spring.version}
            
            
                org.springframework.integration
                spring-integration-kafka
                1.3.0.RELEASE
            
            
                commons-logging
                commons-logging
                1.1.1
            
            
                org.slf4j
                slf4j-api
                1.6.4
                jar
            
            
                org.slf4j
                slf4j-log4j12
                1.6.4
                jar
            下载
            
                javax
                javaee-api
                7.0
            
            
                com.fasterxml.jackson.core
                jackson-core
                2.7.6
            
            
                com.fasterxml.jackson.core
                jackson-databind
                2.7.6
            
            
                com.fasterxml.jackson.core
                jackson-annotations
                2.7.6
            

            
                org.apache.avro
                avro
                1.7.7
            

         
         
            SpringKafkaDEMO
            
                  
                  org.apache.maven.plugins
                  maven-compiler-plugin
                  3.3
                     
                        
                            org.codehaus.plexus
                            plexus-compiler-javac
                            2.5
                        
                     
                     
                        1.7
                        1.7
                        UTF-8
                        
                              
                            ${java.home}/lib/rt.jar:${java.home}/lib/jce.jar
                        
                     
                  
            
         
      


遇到的问题:下载地址
1. spring 中 日志 中的 logback必须 保持一致   ,这里我使用 org.slf4j 1.6.4


    org.slf4j
                slf4j-api
                1.6.4
                jar
            
            
                org.slf4j
                slf4j-log4j12
                1.6.4
                jar
            




页: [1]
查看完整版本: kafka spring 实例