主题:Kafka maintains feeds of messages in categories called topics.
生产者:We’ll call processes that publish messages to a Kafka topic producers.
消费者:We’ll call processes that subscribe to topics and process the feed of published messages consumers.
代理(Broker):Kafka is run as a cluster comprised of one or more servers each of which is called a broker.
生产者通过网路将消息发送到Kafka集群上,集群依次(轮流)服务消息到达消费者。 Kafka运行在一个集群中,集群中的每一个服务器就叫代理。
每一个Partition都是有序的,固定长度的消息队列一直不断增加到–一个提交日志。消息在Partition内分配了顺序的id叫偏移量,这个偏移量在分区中唯一标识每个消息的。
Kafka保存所有(一段时间内的-可配置)已经发布的消息-无论它们是否已经被消费。例如,如果日志保留被设置为两天,那么在一个消息发布后,两天内它是可用的,两天后它将被丢弃到空闲空间
事实上,元数据保留在每个消费进程中,是基于消费进程在日志中的位置,该位置称为“偏移量”(In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the “offset”.)。这个偏移量被消费者控制:正常的消费者读取消息时,线性增加偏移量,但事实上消费者可以以任何它顺序的方式来控制。例如:一个消费者可以重置到以前的偏移量位置来重新处理。
这种组合的特点意味着Kafka的消费者是很廉价的-消费者进程可以随时增加减少,对集群和其它消费者进程没有任何影响。例如:你可以使用命令行工具输出任何主题的内容,而不改变任何现有的消费者所消耗的。
日志中的分区服务几个目的。首先,日志的规模大小可以调整,远不是只有一个在一个服务器上。每个单独的分区都必须安装在主机上的服务器上,一个主题可以有许多分区,所以它可以处理任意数量的数据。第二,它们都是独立相互平行的。
public> public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
//“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
props.put("acks", "all");
//如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
props.put("retries", 0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size", 16384);
//默认立即发送,这里这是延时毫秒数
props.put("linger.ms", 1);
//生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
props.put("buffer.memory", 33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建kafka的生产者类
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//生产者的主要方法
// close();//Close this producer.
// close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
// flush() ;所有缓存记录被立刻发送
for(int i = 0; i < 100; i++)
//这里平均写入4个分区
producer.send(new ProducerRecord<String, String>("foo",i%4, Integer.toString(i), Integer.toString(i)));
producer.close();
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
System.out.println("this is the group part test 1");
//消费者的组id
props.put("group.id", "GroupA");//这里是GroupA或者GroupB