【Kafka一】Kafka入门
这篇文章来自Spark集成Kafka(http://bit1129.iteye.com/blog/2174765),这里把它单独取出来,作为Kafka的入门吧下载Kafka
http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
2.10表示Scala的版本,而0.8.1.1表示Kafka的版本
解压Kafka
惊讶的是Kafka内置了Zookeeper的安装包以及启停Zookeeper的脚本,版本比较低,是3.3.4版本。理论上不应该使用Kafka的版本,因为Zookeeper是个通用分布式配置和协调系统.
实际上,也可以使用Kafka内置的Zookeeper,不过要注意,Zookeeper一般使用3台做集群,如果Kakfa的Broker多于3台,那么就取其中3台运行Zookeeper
配置Kafka
1. 修改配置文件config/server.properties
host.name和avertised.host.name默认是注释掉的,把它打开。一般情况下,不要使用localhost作为host.name,如果使用localhost,远程访问Kakfa服务器容易出现问题。
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=localhost
2. 配置Zookeeper
本文使用单独安装的Zookeeper,而不是使用Kafka自带的Zookeeper,Kafka为了能够知道它要连接的Zookeeper地址,配置文件中提供了一系列和Zookeeper相关的配置参数
除了安装运行独立的Zookeeper,Kafka也可以使用安装包里的Zookeeper,如果Kafka要使用自己的Zookeeper,那么需要在 Kafka的bin目录下启动Zookeeper。因此,如果使用独立的Zookeeper的时候,就无 需启动Kafka下面的Zookeeper了。在 Kafka启动过程中看到有关Zookeeper的日志,这是Kafka作为Zookeeper的客户端正在建立与Zookeeper服务器的通讯
[*] config/server.properties
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
//2181是Zookeeper的clientPort
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
[*]config/producer.properties
无相关配置
[*]config/consumer.properties
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
启动Zookeeper
1.根据之前Kafka对Zookeeper的配置,Zookeeper应该配置端口2181端口
2. 使用如下命令启动Zookeeper,启动Zookeeper的参数如下:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/hadoop/software/zookeeper-3.4.6/data
# the port at which the clients will connect
clientPort=2181
启动Kafka
1.启动Kafka(需要指定server.properties)
$ bin/kafka-server-start.sh config/server.properties
2. 启动日志
2015-01-11 01:11:12,490] INFO Verifying properties (kafka.utils.VerifiableProperties)
////broker.id是在server.properties中定义的
INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)
INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
INFO , starting (kafka.server.KafkaServer)
INFO , Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
INFO Client environment:host.name=hadoop.master (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.version=1.7.0_67 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.home=/home/hadoop/software/jdk1.7.0_67/jre (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.class.path=:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.dir=/home/hadoop/software/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)
INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7a50a6d4 (org.apache.zookeeper.ZooKeeper)
INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x14ad79bb13d0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
////log.dirs不存在,先创建
INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
INFO , Started (kafka.network.SocketServer)
INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
///只有1台Zookeeper服务器,因此serverid为0的Zookeeper成为leader
INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
////注册Broker到Zookeeper的/brokers/ids/0节点
INFO Registered broker 0 at path /brokers/ids/0 with address hadoop.master:9092. (kafka.utils.ZkUtils$)
INFO , started (kafka.server.KafkaServer)
INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
Kafka简单测试
1.创建一个topic
参数信息:Zookeeper的信息,Topic的名字,Topic的Partition数,复制因子(复制因子必须小于等于Broker数目)
///创建一个Topic,取名为test
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1--topic test
Created topic "test".
///列出创建的Topic,这里只有一个test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
2. Producer创建消息
启动时,除了打印SLF4J之外,没有别的。下面可以直接输入生产的数据(生产消息时不需要指定Partition,Kafka自动做Partition路由,每个Partition都是有Lead Partition和Follower Partitions组成,Lead Partition负责读写,而Follower Partitions只做复制,在Lead Partition挂了之后,自动做Failover)
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
This is mesage
This is a test
3. Consumer消费消息
启动时,除了打印SLF4J之外,没有别的
--from-beginning实际上是指定offset的读取策略。
对于smallest和largest还是理解不到位,smallest和largest策略表示Zookeeper上的offset还没有初始化为正确值时,如何初始化offset的问题?试想,Producer生产了一批消息到Kafka中,但是Kafka尚未由任何Consumer读取,而Kafka的Offset是由Consumer进行初始化和赋值的,因此此时的Zookeeper上的offset并没有预期的0(0表示尚未读取过),而是一个不正确的随机数,那么Consumer来读取消息时,是从头开始读还是从最大的位置等待Producer创建消息后再读取,此时就产生了两个选择,smallest表示从头读,largest表示从最大位置读
auto.offset.reset(默认是largest):
What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
This is mesage
This is a test
4.此时在Producer终端输入内容,在Consumer终端可以立即收到
页:
[1]