|
一、kafka理论
1、kafka是神马?
kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
2、kafka的设计方案
消息持久化及其缓存
磁盘性能:在传统的磁盘写入很慢,因为它使用随机写入 50k/s(6个7200转的sata硬盘组成的raid-5),但是线性写入速度有300ms/s的速度,所以kafka利用线性写入的方式。
线性写入:将数据调用操作系统文件接口写到文件系统里面去这样就缓存到操作系统的页面缓存中,然后传统意思来说将其flush一下到磁盘中,但是kafka并没有这样,而是保存在页面缓存中(相当于放在内存当中)并没有进行flush操作,这样他就会提供比较高的读的性能,下次读就从内核页面缓存中读数据,但是内存中存储数量不是无限大的,所以我们配置参数(每当接收到N条信息或者每过M秒),进行一个flush操作,从而可以为系统硬件崩溃时“处于危险之中”的数据在量上加个上限。
kafka的缓存不是在内存中保存尽可能多的数据并在需要时将这些数刷新到文件系统,而是做完全相反的事情,将所有的数据立即写入文件系统中的持久化的日志中,但不进行刷刷新数据的调用,实际这么做意味着数据被传输到os内核的页面缓存中去了,随后在根据配置刷新到硬盘。
持久化常量时间
以往的消息系统元数据的持久化数据结构往往采用BTree。BTree是目前最常用额数据结构,在消息系统中它可以用来广泛支持多种不同的事物性活非事务性的语义,他的确带来了一个非常高的处理开销,Btree的运算时间的复杂度为0(LOG n),Btree需要一种非常负责的页面级或者行级锁才能避免在每次操作时锁定整颗树。实现这种机器就要为行级锁定付出非常昂贵的代价,负责就必须所有的读取操作进行串行化(serialize).因为对磁盘寻道操作的高度依赖,就不太可能高效的从驱动器密码的提高中获得改善,因而就不得不使用容量较小(/data/myid
# echo "2">/data/myid
# echo "3">/data/myid
3、kafka安装
wget https://archive.apache.org/dist/kafka/0.8.1/kafka_2.10-0.9.0.1.tgz
tar zxvf kafka_2.10-0.9.0.1.tgz
nohup bin/kafka-server-start.sh config/server.properties &
二、配置文件详解
#server.properties配置文件
broker.id=1
port=9092
host.name=10.0.0.8
zookeeper.connect=10.0.0.8:2181,10.0.0.0.9:2181,10.0.0.10:2181
num.network.threads=8 #broker处理消息的最大线程数 一般等于核心数
num.io.threads=8# 同上
socket.send.buffer.bytes=1048576 #socket 发送缓冲区 socket调优参数
socket.receive.buffer.bytes=1048576 #接收缓冲区 socket调优参数
socket.request.max.bytes=104857600 #socket请求最大数值,防止serverOOM
log.dirs=/data/kafka/kafka-logs
num.partitions=8 #默认制定分区 会被命令行参数覆盖
log.retention.check.interval.ms=60000 #文件大小检查周期
log.cleaner.enable=false #是否启用压缩
segment.ms=24*60*60
num.replica.fetchers=4 #leader 进行复制的线程数。 增大这个数值会增加follow的io
default.replication.factor=2 # 创建topic的时候的副本数,可以创建topic时制定参数覆盖
replica.fetch.max.bytes=2048000 # replicas 每次获取数据的最大大小
replica.fetch.wait.max.ms=500 # replicas 同leader之间的通信的最大等待时间,失败了会重试。
replica.high.watermark.checkpoint.interval.ms=5000 #每个replica检查是否将最高水平进行固化的频率
replica.socket.timeout.ms=30000 #follower与leader之间的socket超时时间
replica.socket.receive.buffer.bytes=65536 #leader复制时间的socket缓存大小
replica.lag.time.max.ms=10000 #replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.max.messages=4000 #如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效 ##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后 ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移 ##到其他follower中. ##在broker数量较少,或者网络不足的环境中,建议提高此值.
controller.socket.timeout.ms=30000 # partition leader与replicas之间通讯时,socket的超时时间
controller.message.queue.size=10 #partition leader与replicas 数据同步时,消息的队列尺寸
message.max.bytes=2048000 #消息体的最大大小 但是是字节 --------------------
auto.create.topics.enable=true #是否允许自动创建topic,若是false就需要通过命令创建topic
log.index.interval.bytes=4096 #当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.size.max.bytes=10485760 #对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.retention.hours=24 #数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理。
log.flush.interval.ms=10000 #仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发. --------
log.flush.interval.messages=20000 #log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
log.flush.scheduler.interval.ms=2000 #检查是否需要固化到硬盘的时间间隔
log.roll.hours=24 #这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.cleanup.interval.mins=30 #检查处理规则间隔
log.segment.bytes=1073741824 #一个消息长度 超过在创建一个
zookeeper.connection.timeout.ms=6000 #zookeeper连接超时时间
zookeeper.sync.time.ms=2000 #一个zk flower能落后leader多久
fetch.purgatory.purge.interval.requests=1000 ## f防止oom 的参数 用于request 状态转变为complete后从purgatory中移除。
producer.purgatory.purge.interval.requests=1000 ## f防止oom 的参数 用于request 状态转变为complete后从purgatory中移除。
三、kafka常用操作
1、查看topic list
bin/kafka-topics.sh --list --zookeeper x.x.x.x:2181
2、创建topic
bin/kafka-topics.sh --create --zookeeper x.x.x.1:2181,x.x.x.2:2181,x.x.x.3:2181 --replication-factor 3 --partitions 9 --topic topic_name1
--topic test_kafka_14
--zookeeper 指定多个zookeeper服务器
--replication 创建副本数 3表示一共有3个复本,3台broker的话,最大允许2台宕机,如果复本2个的话最多就只能宕机一台。
3、查看topic状态
bin/kafka-topics.sh --describe --zookeeper x.x.x.1:2181 --topic test_kafka_14
--describe 描述参数
--zookeeper 指定一台zookeeper
结果:
Topic:test-0926PartitionCount:9ReplicationFactor:3Configs:
Topic: test-0926Partition: 0Leader: 1Replicas: 1,2,0Isr: 2,0,1
Topic: test-0926Partition: 1Leader: 2Replicas: 2,0,1Isr: 2,0,1
Topic: test-0926Partition: 2Leader: 0Replicas: 0,1,2Isr: 2,0,1
Topic: test-0926Partition: 3Leader: 1Replicas: 1,0,2Isr: 2,0,1
Topic: test-0926Partition: 4Leader: 2Replicas: 2,1,0Isr: 2,0,1
Topic: test-0926Partition: 5Leader: 0Replicas: 0,2,1Isr: 2,0,1
Topic: test-0926Partition: 6Leader: 1Replicas: 1,2,0Isr: 2,0,1
Topic: test-0926Partition: 7Leader: 2Replicas: 2,0,1Isr: 2,0,1
Topic: test-0926Partition: 8Leader: 0Replicas: 0,1,2Isr: 2,0,1
单行代表:指标名称 分区号 目前该分区负责读写的leader 该分区数据的复本所在leader 当前分区可用(可接管)的leader.
4、手动重新分配分区
bin/kafka-preferred-replica-election.sh --zookeeper x.x.x.1:2181,x.x.x.2:2181,x.x.x.3:2181
默认9.0.1版本也会自动调整分区即负载均衡在某个broker宕机又重新启动后。
5、增加分区(kafka只支持增加不支持动态减少)
bin/kafka-add-partitions.sh --topic test --partition 2
--zookeeper
192.168.197.170:2181,192.168.197.171:2181 (为topic test增加2个分区)
bin/kafka-topics.sh --zookeeper 10.3.12.49:2181,10.3.12.53:2181,10.3.12.54:2181 -alter --partitions 6 --topic leo-ops-main-client1 6、控制台接收消息
bin/kafka-console-consumer.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --from-beginning --topic test 7、控制台发送消息
bin/kafka-console-producer.sh --broker-list 192.168.197.170:9092,192.168.197.171: 9092
--topic test 8、kafka之删除topic
例如要删除 test_kafka_1
1、先在kafka删除topic
./bin/kafka-topics.sh --delete --zookeeper 【zookeeper server】 --topic test_kafka_1
## 停止 kakfa
2、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")
cd /KAFKADIR ; rm -rf test_kafka_1-*
3、zookeeper删除topic
zookeeper-3.4.9/bin/zkCli.sh -server 127.0.0.1:2181
rmr /brokers/topics/test_kafka_1
4、清空zookeeper admin下删除topic
rmr /admin/delete_topics/test_kafka_1
5、删除zookeeper中的消费记录
#删除topic test的consumer group,如果有消费记录的话
rmr /kafka/consumers/test_kafka_1*
# 停止zookeeper
6、清空replication信息否则将一直报错。
vi replication-offset-checkpoint
原文为:
0
15
pjtest 4 0
pjtest 8 0
managejob 5 0
pjtest 6 0
pjtest 0 0
pjtest 5 0
pjtest 3 0
pjtest 7 0
pjtest 9 0
pjtest 2 0
managejob 9 0
managejob 1 0
managejob 3 0
pjtest 1 0
managejob 7 0
修改为:
0
5
managejob 5 0
managejob 9 0
managejob 1 0
managejob 3 0
managejob 7 0
vi recovery-point-offset-checkpoint
同理修改
|
|