每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在 $KAFKA_HOME/config/server.properties 中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。
# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=3 在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition.class这一参数来指定,该class必须实现 kafka.producer.Partitioner 接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)
import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;public class JasonPartitioner implements Partitioner {public JasonPartitioner(VerifiableProperties verifiableProperties) {}@Override public int partition(Object key, int numPartitions) {try {int partitionNum = Integer.parseInt((String) key);return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {return Math.abs(key.hashCode() % numPartitions);
}
}
}
如果将上例中的class作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic2(包含4个partition)。
public void sendMessage() throws InterruptedException{
for(int i = 1; i {thisbroker
id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。
If the consumer creates a message stream using a topic filter, it
also registers a watch on changes under the broker topic registry.
Force itself to rebalance within in its consumer group.
在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumerrebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer
group的一致性,所以当一个consumer触发了rebalance时,该consumer
group内的其它所有consumer也应该同时触发rebalance。
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
Kafka的delivery guarantee
semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点有点像向一个自动生成primarykey的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary
key的东西,发生故障时幂等性的retry多次,这样就做到了 Exactly one 。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了 At least once ,但可通过设置producer异步发送实现 At most once )。
接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka
consumer high level
API)。consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了 Exactly once 。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once
读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于 At least once 。在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是 Exactly once。(人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary
key本身不保证操作的幂等性。而且实际上我们说delivery guarantee
semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性—如是否幂等性,当成Kafka本身的feature)
如果一定要做到 Exactly once ,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现 Exactly once 。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证 At least once ,并且允许通过设置producer异步提交来实现 At most once 。而 Exactly once 要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。
Benchmark
纸上得来终觉浅,绝知些事要躬行。笔者希望能亲自测一下Kafka的性能,而非从网上找一些测试数据。所以笔者曾在0.8发布前两个月做过详细的Kafka0.8性能测试,不过很可惜测试报告不慎丢失。所幸在网上找到了Kafka的创始人之一的 Jay Kreps的bechmark 。以下描述皆基于该benchmark。(该benchmark基于Kafka0.8.1) 测试环境
该benchmark用到了六台机器,机器配置如下
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196Throughput Versus Stored Data
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196Effect of message sizefor i in 10 100 1000 10000 100000;doecho ""echo $ibin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test $((1000*1024*1024/$i)) $i -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=128000done;
Consumer
Consumer throughput
bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 13 Consumers
On three servers, run:bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1End-to-end Latency
bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:9092 esv4-hcl197.grid.linkedin.com:2181 test 5000Producer and consumer
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1
broker配置如下
############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.broker.id=0############################# Socket Server Settings ############################## The port the socket server listens onport=9092# Hostname the broker will bind to and advertise to producers and consumers.# If not set, the server will bind to all interfaces and advertise the value returned from# from java.net.InetAddress.getCanonicalHostName().#host.name=localhost# The number of threads handling network requestsnum.network.threads=4# The number of threads doing disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=1048576# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=1048576# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## The directory under which to store log fileslog.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs# The number of logical partitions per topic per server. More partitions allow greater parallelism# for consumption, but also mean more files.num.partitions=8############################# Log Flush Policy ############################## The following configurations control the flush of data to disk. This is the most# important performance knob in kafka.# There are a few important trade-offs here:# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).# 3. Throughput: The flush is generally the most expensive operation. # The settings below allow one to configure the flush policy to flush data after a period of time or# every N messages (or both). This can be done globally and overridden on a per-topic basis.# Per-topic overrides for log.flush.interval.ms#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletionlog.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining# segments don't drop below log.retention.bytes.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=536870912# The interval at which log segments are checked to see if they can be deleted according # to the retention policieslog.cleanup.interval.mins=1############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=1000000# metrics reporter propertieskafka.metrics.polling.interval.secs=5kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics# Disable csv reporting by default.kafka.csv.metrics.reporter.enabled=falsereplica.lag.max.messages=10000000