设为首页 收藏本站
查看: 1454|回复: 0

[经验分享] 在团800运维工作总结之kafka集群日常工作经验总结

[复制链接]

尚未签到

发表于 2019-1-31 11:27:37 | 显示全部楼层 |阅读模式

  •   一些重要的原理
      基本原理什么叫broker partition cg我就不在这里说了,说一些自己总结的原理
      

      

      1.kafka有副本的概念,每个副本都分在不同的partition中,这中间分为leader和fllower
      2.kafka消费端的程序一定要和partition数量一致,不可以多,会出现有些consumer获取
      不到数据的现象
      3.producer原理
      producer通过zookeeper获取所连接的topic都在那些partiton中,每个parition的leader是那
      个,针对leader进行写操作,prodcer通过zookeeper的watch机制来记录以上的信息,pro
      ducer为了节省网络的io,还会在本地先把消息buffer起来,并将他们批量发送到broker中
      4.consumer原理
      consumer向broker发送fetch请求,并告知获取的消息offset,在kafka中采用pull方式,消费端
      主动pull消息,优点:消费者可以控制消费的数量
  

  

  2.kafka生产环境常用命令总结
  1.模拟生产端,推送数据
  ./bin/kafka-console-producer.sh --broker-list 172.16.10.130:9092 --topic deal_exposure_origin
  

  2.模拟消费端,消费数据
  ./bin/kafka-console-consumer.sh --zookeeper 1172.16.10.140:2181  --topic deal_exposure_origin
  

  3.创建topic,topic partiton数量 副本数 数据过期时间
  ./kafka-topics.sh --zookeeper spark:2181 --create --topic deal_task_log  --partitions 15 --replication-factor 1 retention.ms 1296000000
  

  3.kafka如何动态的添加副本
  1.副本,kafka一定要设置副本,如果之后再加会由于涉及到数据的同步,会把集群的io提升上去
  

  3.如何扩大副本
  2.把所有topic的信息记录到json文件中,信息有topic名称,用了哪些partition,副本在那个partition,
  并修改json数据,添加副本数
  #!/usr/bin/python
  from kazoo.client import KazooClient
  import random
  import json
  

  zk = KazooClient(hosts='172.16.11.73:2181')
  zk.start()
  for i in zk.get_children('/brokers/topics'):
  b= zk.get('/brokers/topics/'+i)[0]
  a = eval(b)['partitions']
  list = []
  dict = {}
  for key,value in a.items():
  if len(value) == 1:
  c = {}
  c['topic'] = i.encode('utf-8')
  c['partition'] = int(key)
  list1 = []
  for ii in range(0,3):
  while True:
  if list1:
  pass
  else:
  for iii in value:
  list1.append(iii)
  if len(list1) == 3:
  break
  num = random.randint(0,4)
  #print 'num='+str(num),'value='+str(value)
  if num not in list1:
  list1.append(num)
  #print list1
  c['replicas'] = list1
  list.append(c)
  

  version = eval(b)['version']
  dict['version'] = version
  dict['partitions'] = list
  #jsondata = json.dumps(dict)
  json.dump(dict,open('/opt/json/'+i+'.json','w'))
  
  3.加载json文件
  /usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-reassign-partitions.sh --zookeeper 192.168.5.159:2181 --reassignment-json-file /opt/test.json --execute
  

  4.查看是否已经添加了副本
  usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --describe --zookeeper 192.168.5.159:2181 --topic testtest
  Topic:testtest  PartitionCount:15       ReplicationFactor:2     Configs:
  Topic: testtest Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 3    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 4    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 5    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 6    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 7    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 8    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 9    Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 10   Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 11   Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 12   Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 13   Leader: 0       Replicas: 0,1   Isr: 0,1
  Topic: testtest Partition: 14   Leader: 0       Replicas: 0,1   Isr: 0,1
  

  4.kafka集群之间做数据同步
  
  找一个broker节点进行同步
  1.创建配置文件mirror_consumer.config
  配置文件里写本地的kafka集群zookeeper
  定义一个group用来去消费所有的topic,进行同步
  zookeeper.connect=172.16.11.43:2181,172.16.11.46:2181,172.16.11.60:2181,172.16.11.67:2181,172.16.11.73:2181
  group.id=backup-mirror-consumer-group
  

  2.创建配置文件mirror_producer.config
  zookeeper,kafka ip写对端集群的ip
  zookeeper.connect=172.17.1.159:2181,172.17.1.160:2181
  metadata.broker.list=172.17.1.159:9092,172.17.1.160:9092
  

  3.同步命令
  $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"
  

  参数详解
  1. 白名单(whitelist) 黑名单(blacklist)
  mirror-maker接受精确指定同步topic的白名单和黑名单。使用java标准的正则表达式,为了方便,逗号(‘,’)被编译为java正则中的(‘|’)。
  2. Producer timeout
  为了支持高吞吐量,你最好使用异步的内置producer,并将内置producer设置为阻塞模式(queue.enqueueTimeout.ms=-1)。这样可以保证数据(messages)不会丢失。否则,异步producer默认的 enqueueTimeout是0,如果producer内部的队列满了,数据(messages)会被丢弃,并抛出QueueFullExceptions异常。而对于阻塞模式的producer,如果内部队列满了就会一直等待,从而有效的节制内置consumer的消费速度。你可以打开producer的的trace logging,随时查看内部队列剩余的量。如果producer的内部队列长时间处于满的状态,这说明对于mirror-maker来说,将消息重新推到目标Kafka集群或者将消息写入磁盘是瓶颈。
  对于kafka的producer同步异步的详细配置请参考$KAFKA_HOME/config/producer.properties文件。关注其中的producer.type和queue.enqueueTimeout.ms这两个字段。
  3. Producer 重试次数(retries)
  如果你在producer的配置中使用broker.list,你可以设置当发布数据失败时候的重试次数。retry参数只在使用broker.list的时候使用,因为在重试的时候会重新选择broker。
  4. Producer 数量
  通过设置—num.producers参数,可以使用一个producer池来提高mirror maker的吞吐量。在接受数据(messages)的broker上的producer是只使用单个线程来处理的。就算你有多个消费流,吞吐量也会在producer处理请求的时候被限制。
  5. 消费流(consumption streams)数量
  使用—num.streams可以指定consumer的线程数。请注意,如果你启动多个mirror maker进程,你可能需要看看其在源Kafka集群partitions的分布情况。如果在每个mirror maker进程上的消费流(consumption streams)数量太多,某些消费进程如果不拥有任何分区的消费权限会被置于空闲状态,主要原因在于consumer的负载均衡算法。
  6. 浅迭代(Shallow iteration)与producer压缩
  我们建议在mirror maker的consumer中开启浅迭代(shallow iteration)。意思就是mirror maker的consumer不对已经压缩的消息集(message-sets)进行解压,只是直接将获取到的消息集数据同步到producer中。
  如果你开启浅迭代(shallow iteration),那么你必须关闭mirror maker中producer的压缩功能,否则消息集(message-sets)会被重复压缩。
  7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes
  镜像经常用在跨集群场景中,你可能希望通过一些配置选项来优化内部集群的通信延迟和特定硬件性能瓶颈。一般来说,你应该对mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer设定一个高的值。此外,mirror-maker中消费者(consumer)的fetch.size应该设定比socket.buffersize更高的值。注意,套接字缓冲区大小(socket buffer size)是操作系统网络层的参数。如果你启用trace级别的日志,你可以检查实际接收的缓冲区大小(buffer size),以确定是否调整操作系统的网络层。
  

  

  4.如何检验MirrorMaker运行状况
  Consumer offset checker工具可以用来检查镜像对源集群的消费进度。例如:
  bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect localhost:2181 --topic test-topic
  KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
  Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0
  Consumer offset = 561154288
  = 561,154,288 (0.52G)
  Log size = 2231392259
  = 2,231,392,259 (2.08G)
  Consumer lag = 1670237971
  = 1,670,237,971 (1.56G)
  BROKER INFO
  0 -> 127.0.0.1:9092
  注意,–zkconnect参数需要指定到源集群的Zookeeper。另外,如果指定topic没有指定,则打印当前消费者group下所有topic的信息。
  

  

  5.kafka所使用的磁盘io过高解决方法
  
  问题:kafka所用磁盘io过高
  我们生产平台有5台kafka机器,每台机器上分了2块磁盘做parition
  最近发现kafka所使用的磁盘io非常高,影响到了生产端推送数据的性能
  一开始以为是由于一个推送日志的topic所导致的的,因为每秒推送数据大概在2w左右,
  后来把此topic迁移到了其他的kafka集群中还是未见效果
  最终iotop发现其实是由于zookeeper持久化的时候导致的
  zookeeper持久化的时候也写到kafka所用到的磁盘中
  

  通过此问题说明几点问题
  1.kafka用zookeeper,和大家所熟悉的其他应用例如solrcloud codis otter不太一样
  一般用zookeeper都是管理集群节点用,而kafka用zookeeper是核心,生产端和消费端都会去
  链接zookeeper获取响应的信息
  生产端通过链接zookeeper获取topic都用了那些parition,每个parition的副本的leader是那个
  消费端链接zookeeper获取offset,消费端消费都会操作对zookeeper的数据进行修改,对io的操作
  很频繁
  

  解决方法:
  禁止zookeeper做持久化操作
  配置文件中添加一行
  forceSync=no
  问题解决
  






运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669991-1-1.html 上篇帖子: kafka kerberos 认证访问与非认证访问共存下的ACL问题 下篇帖子: 消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表