设为首页 收藏本站
查看: 1162|回复: 0

[经验分享] kafka use

[复制链接]

尚未签到

发表于 2017-5-23 14:53:30 | 显示全部楼层 |阅读模式
kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:


  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。



kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
(1)message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
(2)Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.

1、kafka安装


1

http://kafka.apache.org/documentation.html#quickstart
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
[iyunv@itr-mastertest01 local]# tar -zxvf software/kafka_2.10-0.8.2-beta.tgz
#TCP TEST
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
# vim config/server.properties
brokerid:这个每个server(broker)必须唯一,写数字
host.name:这个也是唯一的,写ip或者hostname
[iyunv@itr-mastertest01 config]# egrep -v "^$|#" server.properties  
broker.id=1
port=9092
host.name=itr-mastertest01  #所在节点的ip地址,如果不想配置全为localhost即可
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=65536
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181
zookeeper.connection.timeout.ms=2000
[iyunv@itr-mastertest01 config]# egrep -v "^$|#" zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
[iyunv@itr-mastertest01 local]# scp -rq kafka_2.10-0.8.2-beta itr-mastertest02:/usr/local/

2、启动kafka


1

#启动之前zookeeper必须启动!zookeeper请参考[zookeeper cluster deploy](http://www.itweet.cn/2015/07/12/zk-cluster-deploy/)
[iyunv@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
--建议下面方式启动否则关闭终端kafka就挂了
# nohup kafka/bin/kafka-server-start.sh kafka/config/server.properties > kafka/kafka-logs/kafka-server.log &
[iyunv@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
[iyunv@itr-nodetest01 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
[iyunv@itr-nodetest02 kafka_2.10-0.8.2-beta]# bin/kafka-server-start.sh config/server.properties &
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids   
null
cZxid = 0x1d00000939
ctime = Tue May 12 23:52:57 CST 2015
mZxid = 0x1d00000939
mtime = Tue May 12 23:52:57 CST 2015
pZxid = 0x1d00000ca7
cversion = 4
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0

3、创建一个topic


1

[iyunv@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --create --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --replication-factor 3 --partitions 1 --topic test

4、命令行查看topic


1

[iyunv@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --list --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181
test

5、发送一些消息


1

[iyunv@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-console-producer.sh --broker-list itr-mastertest01:9092 --topic test         
This is a message
This is another message
[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/test/partitions/3/state
{"controller_epoch":1,"leader":3,"version":1,"leader_epoch":0,"isr":[3,1]}
[zk: localhost:2181(CONNECTED) 12] get /brokers/ids/3
{"jmx_port":-1,"timestamp":"1431447378690","host":"itr-nodetest01","version":1,"port":9092}

6、开始消费信息


1

[iyunv@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
[iyunv@itr-nodetest01 kafka_2.10-0.8.2-beta]#  bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg...
[iyunv@itr-nodetest02 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test --from-beginning
This is a message
This is another message
kafaka to slave msg.

8、查看集群topic详细信息


1

[iyunv@itr-mastertest02 kafka_2.10-0.8.2-beta]# bin/kafka-topics.sh --describe --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic test
Topic:test      PartitionCount:4        ReplicationFactor:2     Configs:
Topic: test     Partition: 0    Leader: 4       Replicas: 4,2   Isr: 4,2
Topic: test     Partition: 1    Leader: 1       Replicas: 1,3   Isr: 1,3
Topic: test     Partition: 2    Leader: 2       Replicas: 2,4   Isr: 2,4
Topic: test     Partition: 3    Leader: 3       Replicas: 3,1   Isr: 3,1
消息被写到目录:
[iyunv@itr-mastertest02 bin]# ls /tmp/kafka-logs/test-2
00000000000000000000.index  00000000000000000000.log

9、删除topic


1

$ kafka-topics.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
配置server.config , delete.topic.enable=true即可删除

10、kafka的webui



  • Kafka监控工具一


    1

    https://github.com/quantifind/KafkaOffsetMonitor
    wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar
    Running It
    This is a small webapp, you can run it locally or on a server, as long as you have access to the ZooKeeper nodes controlling kafka.
    java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
    com.quantifind.kafka.offsetapp.OffsetGetterWeb \
    --zk zk-server1,zk-server2 \
    --port 8080 \
    --refresh 10.seconds \
    --retain 2.days
    The arguments are:
    -   zk the ZooKeeper hosts
    -   port on what port will the app be available
    -   refresh how often should the app refresh and store a point in the DB
    -   retain how long should points be kept in the DB
    -   dbName where to store the history (default 'offsetapp')
    [iyunv@itr-mastertest01 local]# mkdir kafka-offset-console
    [iyunv@itr-mastertest01 local]# cd kafka-offset-console/
    [iyunv@itr-mastertest01 kafka-offset-console]# cat mobile_start_en.sh
    #!/bin/bash
    cd /usr/local/kafka-offset-console
    java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp ./KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk itr-mastertest01,itr-mastertest02,itr-nodetest01/config/mobile/kafka-offset-console --port 9999 --refresh 10.seconds --retain 7.days 1>./stdout.log 2>./stderr.log &
    [iyunv@itr-mastertest01 kafka-offset-console]# chmod +x mobile_start_en.sh  
    [iyunv@itr-mastertest01 kafka-offset-console]# sh mobile_start_en.sh
    [iyunv@itr-mastertest01 kafka-offset-console]# tail -f stdout.log
    serving resources from: jar:file:/usr/local/kafka-offset-console/KafkaOffsetMonitor-assembly-0.2.1.jar!/offsetapp


  • Kafka监控工具二


    1

    https://github.com/yahoo/kafka-manager
    [iyunv@itr-mastertest01 kafka-manager]# echo $JAVA_HOME
    /usr/local/jdk1.7.0_45
    [iyunv@itr-mastertest01 sbt]# sbt -version
    Getting org.scala-sbt sbt 0.13.8 ...
    https://github.com/yahoo/kafka-manager
    [iyunv@itr-mastertest01 hsu]# yum install git -y
    [iyunv@itr-mastertest01 hsu]# git clone https://github.com/yahoo/kafka-manager.git
    [iyunv@itr-mastertest01 hsu]# cd kafka-manager/
    [iyunv@itr-mastertest01 kafka-manager]# sbt clean dist #这里生产zip包
    Configuration
    $ unzip kafka-manager-1.0-SNAPSHOT.zip
    在kafka-manager安装包的conf目录下面的application.conf文件中进行配置
    kafka-manager.zkhosts="itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181"
    Starting the service
    $ bin/kafka-manager -Dconfig.file=./conf/application.conf -Dhttp.port=8081  > manager-ui.log &



这个工具需要自己编译,也可以直接找我获取编译包!


  • Kafka监控工具三

    1

    https://github.com/shunfei/DCMonitor
    druid:
    https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_StatViewServlet%E9%85%8D%E7%BD%AE


11、kafka和MQ区别


1

https://github.com/alibaba/RocketMQ/wiki/rmq_vs_kafka
http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines


  • 性能对比
    · Kafka单机写入TPS约在百万条/秒,消息大小10个字节
    · RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节
    总结:Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker。

12、项目分析


1

数据源:
Oracle订单表

写代码
发送数据给kafka
接收:
[iyunv@itr-mastertest01 kafka_2.10-0.8.2-beta]# bin/kafka-console-consumer.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic order --from-beginning
998801026       11118   731     44      0       2015-05-17 00:36:08
46300226        68178   556     14      0       2015-05-17 00:36:08
367575834       42812   820     21      0       2015-05-17 00:36:08
97829386        96289   583     67      1       2015-05-17 00:36:08
#需要拷贝kafka的包到storm相应的lib目录下
#zk创建节点
[zk: localhost:2181(CONNECTED) 11] create /order 1
Created /order
[zk: localhost:2181(CONNECTED) 12] create /order/id 1
Created /order/id
[zk: localhost:2181(CONNECTED) 13] ls /order
[id]
#创建topic
kafka-topics.sh --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --delete --topic order
bin/kafka-topics.sh --create --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --replication-factor 3 --partitions 4 --topic order
bin/kafka-topics.sh --describe --zookeeper itr-mastertest01:2181,itr-mastertest02:2181,itr-nodetest01:2181 --topic order
#打包测试1
storm-0.9.2/bin/storm jar storm-code-1.0-SNAPSHOT-jar-with-dependencies.jar com.kafka_storm.topology.CounterTopology
#打包测试2
$ storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology
#优化数据结构保证数据不丢失!通过存储到第三方内存系统中如redis/memcached
#zk分布式锁保证多线程处理数据,数据一致性,使用第三方封装zkcli包,保证每次只有一个线程去操作mysql,而不是多个线程导致数据混乱!保证同一条数据不会被多个线程同时处理![这样就可以topology开启多线程处理数据!]
http://repo1.maven.org/maven2/com/netflix/curator/
#集群模式测试,多线程
# storm/bin/storm jar storm-test_fat.jar com.kafka_storm.topology.OrderTopology  OrderTopology > order.log

参考:


  • 【0】kafka:https://kafka.apache.org/documentation.html#gettingStarted
  • 【1】Kafka主页:http://sna-projects.com/kafka/design.php
  • 【3】Kafka与Hadoop:http://sna-projects.com/sna/media/kafka_hadoop.pdf

原创文章,转载请注明: 转载自sparkjvm的博客

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379778-1-1.html 上篇帖子: 【Spark六十一】Spark Streaming结合Flume、Kafka进行日志分析 下篇帖子: 【Kafka一】Kafka入门
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表