生活如麻 发表于 2019-1-31 08:44:07

Kafka系列之1—Kafka的总体认识

Kafka的总体认识
  1.非中心的架构模型
  2.基于TCP的一套Kafka通信协议
  3.消息中间件&存储系统
  4.存储逻辑层的高并发保证
  5.isr机制降低了保证分布式一致性的代价
1. 非中心的架构模型
我们知道,在分布式系统的架构类型里,既有主从式的架构,也有非中心式的架构,像hadoop和hbase都采用了主从式的架构模型,主从式的架构优点有很多,但是主从式下为了避免单点故障而采取的各种策略使得主从式架构的优点并不那么理想,kafka作为一个分布式的消息系统,它采用了非中心式的架构模型,每个节点都作为独立的Server向Client提供服务,在集群环境下,多个节点依赖zookeeper维护client在读写访问中的分布式一致性。
在早期0.8.2之前的kafka版本中,kafka对zookeeper的依赖非常大,producer、server、consumer都非常依赖zookeeper,虽然zookeeper作为一个轻量级的文件系统(已经成为分布式服务的基础服务,用以提供分布式环境下的一致性),但是大量的与其交互仍然会导致一些性能问题和不稳定的方面,在0.8.2之后的改进中,通过将一些状态保持在kafka自身中,减少与zookeeper的大量交互,为读写提供了更稳定的实现。
2. 基于TCP的一套Kafka通信协议
2.1 概述
kafka的通信协议相当的简单,只有六类核心的客户端请求APIs。

[*]  Metadata:描述当前可用的brokers的host和port信息,并给出每个broker上分配了哪些partitions;
[*]  Send:发送messages到broker;
[*]  Fetch:从broker中获取messages,包括获取data、获取集群的元数据信息以及获取某个topic的offset信息;
[*]  Offsets:获取某个给定topic partition的可用offsets信息;
[*]  Offset Commit:提交consumer group的offsets信息;
[*]  Offset Fetch:获取某个consumer group的offsets信息集合。

这些都会在下面详细描述。另外,0.9版本的kafka对consumers和kafka connect支持一般的group management。这部分的Client Api包括五种requests:

[*]  GroupCoordinator:定位当前consumer group的coordinator;
[*]  JoinGroup:加入一个consumer group,如果没有就创建一个;
[*]  SyncGroup:同步同一个group下的所有consumer状态(partition分配到consumer的分布情况);
[*]  Heartbeat:用来检测group中的成员的存活状态;
[*]  LeaveGroup:直接离开一个group。
还有一些用于监控/管理 kafka集群的administrativeAPIs

[*]  DescribeGroups:用来检测当前的groups;
[*]  ListGroups:列出broker中管理的groups。
Request格式如下:
  RequestMessage => ApiKey ApiVersionCorrelationId ClientId RequestMessage
  ApiKey => int16
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest |OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Response格式如下:
  Response => CorrelationIdResponseMessage
  CorrelationId => int32
  ResponseMessage => MetadataResponse |ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse |OffsetFetchResponse
  ApiKey
  这个int数值是用来表明是哪一种请求,KafkaApis根据这个值来调用相应的处理逻辑
  ApiVersion
  由于不同的Kafka版本支持的ApiVersion不同,因此要根据KafkaServer支持的ApiVersion来发送对应格式的Request
  CorrelationId
  客户端提供的一个整型值,在response中会原封不动的返回,它的作用主要是用来匹配client和server之间的request和response。
2.3 ApiKey
  下面的列表是ApiKey的整型值对应的Request类:
API name
ApiKey Value
ProduceRequest
0
FetchRequest
1
OffsetRequest
2
MetadataRequest
3
Non-user facing control APIs
4-7
OffsetCommitRequest
8
OffsetFetchRequest
9
GroupCoordinatorRequest
10
JoinGroupRequest
11
HeartbeatRequest
12
LeaveGroupRequest
13
SyncGroupRequest
14
DescribeGroupsRequest
15
ListGroupsRequest
16
2.4Response中的Error Codes
  Error
  Code
  重试
  描述
  NoError
  0

  没有错误,执行成功!
  Unknown
  -1

  未知的server error
  OffsetOutRange
  1

  请求的offset值超出了server端维护的对应topic/partition的offset值(可以大于也可以小于)
  InvalidMessage/CorruptMessage
  2
  YES
  消息内容不能通过CRC校验
  UnknownTopicOrPartition
  3
  YES
  请求的topic或partition不再发往的broker上
  InvalidMessageSize
  4

  消息的大小为负值
  LeaderNotAvailable
  5
  YES
  请求发生在leader选举过程中时抛出这个异常,此时请求的partition没有leader无法读写
  NotLeaderForPartition
  6
  YES
  在客户端尝试向不是leader的replica写入信息时抛出,意味着客户端的元数据信息过期了
  RequestTimedOut
  7
  YES
  request超过了用户指定的时间,一般是值socket超时
  BrokerNotAvailable
  8

  这个错误不是client遇到的,往往发生在工具类的请求中
  ReplicaNotAvailable
  9

  broker上没有期望的replica(可以被安全的忽视)
  MessageSizeTooLarge
  10

  server有一个最大消息的配置,当client向server端写入超过配置大小的message时抛出
  StaleControllerEpochCode
  11

  在broker和broker通信时发生的内部错误
  OffsetMetadataTooLargeCode
  12

  如果指定了一个大于配置的offset metadata大小的string
  GroupLoadInProgressCode
  14
  YES
  当topic partition的leader发生变化后,新的leader在load offsets时,offset fetch request请求时抛出,或者在groupmembership(例如heartbeat)的response中返回当coordinator在load groupmetadata时
  GroupCoordinatorNotAvailableCode
  15
  YES
  offsets topic还没创建或者group coordinator没有active
  NotCoordinatorForGroupCode
  16
  YES
  offset fetch或commit request的请求发往一个不是coordinator的节点
  InvalidTopicCode
  17

  访问一个不可用的topic或者尝试对内部topic(__consumer_offset)进行写入操作时
  RecordListTooLargeCode
  18

  如果produce的message batch超过了配置的segment size
  NotEnoughReplicasCode
  19
  YES
  处于in-sync的replicas数量小于配置的produce要求的最小replicas和requiredAcks=-1
  NotEnoughReplicasAfterAppendCode
  20
  YES
  当message被写入到log后,但是in-sync的replicas数小于需要的
  InvalidRequiredAcksCode
  21

  请求的requiredAcks是不可获得的
  IllegalGenerationCode
  22

  server端的generation id和request中的generation id不一致
  InconsistentGroupProtocolCode
  23

  当前group能够接受的protocol type中不包含join group时给出的protocol type
  InvalidGroupIdCode
  24

  当join group时groupId为空或者null
  UnknownMemberIdCode
  25

  当前generation里group中不包含请求的memberId
  InvalidSessionTimeoutCode
  26

  join group时超出了配置的requestsession timeout
  RebalanceInProgressCode
  27

  当请求发起时coodinator正在对group进行rebalance,此时client要重新join group
  InvalidCommitOffsetSizeCode
  28

  当offset commit超过metadata的最大限制被拒绝时
  TopicAuthorizationFailedCode
  29

  client没有访问请求的topic的权限时
  GroupAuthorizationFailedCode
  30


  ClusterAuthorizationFailedCode
  31


  kafka实现了基于tcp的一种通信协议,只要符合通信协议的规范,即可与kafka server进行通信,因而kafka client是跨语言的
3. 消息中间件&存储系统
  kafka既可以被认为是消息中间件,也可以作为存储系统使用
  由于kafka可以将producer发送的消息保存起来供consumer消费,因此既可以作为消息中间件使用,也可以作为存储系统来保存数据。
4. 存储逻辑层的高并发保证
  kafka在存储逻辑层的设计为高吞吐量提供了可能
  kafka存储数据的逻辑单元是partition,producer和consumer的处理单元也是基于partition的,针对某个topic,可以有多个partition,而多个partition又可以分布在不同的节点上,从而在存储层保证了I/O的并发,为高吞吐量提供了可能。
5. isr机制降低了保证分布式一致性的代价
  kafka的isr同步机制使得保证分布式一致性的代价大大降低
  kafka的isr机制,允许isr中的replica和主副本之前有一定的差距,这样做保证了响应的及时性,另一方面,由于在isr层面没有考虑严格的分布式一致性,没有使用paxos的leader选举策略,使得kafka的leader选举更加容易,没有严格的节点数要求的限制,只要有一个节点是isr中的,就不会丢数据。
  




页: [1]
查看完整版本: Kafka系列之1—Kafka的总体认识