设为首页 收藏本站
查看: 1047|回复: 0

[经验分享] Kafka快速上手-中文文档

[复制链接]

尚未签到

发表于 2019-1-31 10:12:38 | 显示全部楼层 |阅读模式
  文章来源:http://kafka.apache.org/quickstart
  本教程假定您正在开始尝鲜,没有现有的Kafka或ZooKeeper数据。
  Step 1: 下载kafka
  官方网站:下载最新版本的kafka压缩包
  http://kafka.apache.org
> tar -xzf kafka_2.12-0.10.2.1.tgz  
> cd kafka_2.12-0.10.2.1
  
  Step 2: 启动kafka
  Kafka使用ZooKeeper,所以您需要先启动一个ZooKeeper服务器,如果您还没有。您可以使用随kafka一起打包的便捷脚本来获取一个快速而又脏的单节点ZooKeeper实例。
> bin/zookeeper-server-start.sh config/zookeeper.properties  
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  
...
  启动Kafka server:
> bin/kafka-server-start.sh config/server.properties  
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
  
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  
...
  Step 3: 新建主题
  创建一个名为"test"的主题,只有一个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  查看新创建的test的主题:
> bin/kafka-topics.sh --list --zookeeper localhost:2181  
test
  或者,代替手动创建主题,您也可以配置为在不存在的主题发布时自动创建主题。
  Step 4: 启动producer端发送消息
  Kafka附带一个命令行客户端,它将从文件或标准输入中输入,并将其作为消息发送到Kafka群集。默认情况下,将按行分割作为单独的消息发送。
  运行生产者,然后在控制台中输入一些消息以发送到服务器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
This is a message
  
This is another message
  Step 5: 启动consumer端接收消息
  Kafka还有一个consumer端命令行,将把消息转储到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  
This is a message
  
This is another message
  如果您将上述每个命令都运行在不同的终端中,那么您现在应该可以在producer端中输入消息,并看到它们出现在consumer端中。
  所有命令行工具都有其他选项; 运行不带任何参数的命令将显示更详细地记录它们的使用信息。
  Step 6: 配置多代理Kafka集群
  到目前为止,我们一直在针对一个单实例的Kafka,但这没有乐趣。对于Kafka,单个代理只是一个大小为1的集群,所以没有什么改变,除了启动更多的代理实例。但是为了让它感觉到,我们将集群扩展到三个节点(仍然在本地机器上)。
  首先我们为每个brokers制作一个配置文件
> cp config/server.properties config/server-1.properties  
> cp config/server.properties config/server-2.properties
  编辑这些新的配置文件并设置以下属性:
config/server-1.properties:  
    broker.id=1
  
    listeners=PLAINTEXT://:9093
  
    log.dir=/tmp/kafka-logs-1
  
config/server-2.properties:
  
    broker.id=2
  
    listeners=PLAINTEXT://:9094
  
    log.dir=/tmp/kafka-logs-2
  broker.id属性是集群中每个节点的唯一和永久名称。我们必须覆盖端口和日志目录,只因为我们在同一台机器上运行这些目录,我们希望保持brokers不要在同一个端口上注册或覆盖对方的数据。
  我们已经有Zookeeper,我们的单个节点启动了,所以我们只需要启动两个新节点:
> bin/kafka-server-start.sh config/server-1.properties &  
...
  
> bin/kafka-server-start.sh config/server-2.properties &
  
...
  现在来创建一个复制因子为3的主题:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic  好的,现在我们有一个集群,我们怎么知道哪个broker在做什么?通过运行“describe topics”命令查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic  
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
  
Topic: my-replicated-topicPartition: 0Leader: 1Replicas: 1,2,0Isr: 1,2,0
  以下是对输出信息的说明。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们这个主题只有一个分区,所以只有一行。
  “leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
  “replicas”是复制此分区的日志的节点列表,无论它们是领先者还是目前都是活着的。
  “isr”是一组“in-sync”副本。这是副本列表的子集,该列表目前是活跃的,并且被追加到领导者。
  请注意,在上面的示例中,节点1是主题唯一分区的领导者。
  我们可以在原始创建的主题上运行相同的命令来查看它的位置:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test  
Topic:testPartitionCount:1ReplicationFactor:1Configs:
  
Topic: testPartition: 0Leader: 0Replicas: 0Isr: 0
  所以没有什么惊喜 - 原来的主题没有复制品,在服务器0上,是我们集群中创建的唯一的服务器。
  让我们发布一些消息给我们的新主题:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic  
...
  
my test message 1
  
my test message 2
  
^C
  现在我们来看看消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic  
...
  
my test message 1
  
my test message 2
  
^C
  现在让我们来测试容错。broker1作为领导者,所以让我们kill掉它:
> ps aux | grep server-1.properties  
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
  
> kill -9 7564
  领导已经切换到从节点中的其中一个,节点1不再处于in-sync中:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic  
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
  
Topic: my-replicated-topicPartition: 0Leader: 2Replicas: 1,2,0Isr: 2,0
  但是尽管最初采取写作的领导者的节点为down状态,消息仍然可以消耗掉:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic  
...
  
my test message 1
  
my test message 2
  
^C
  Step 7: 使用Kafka Connect导入导出数据
  从控制台编写数据并将其写回到控制台是一个方便的开始的地方,但您可能希望使用其他来源的数据导入Kafka或将数据从Kafka导出到其他系统。对于许多系统,不用编写自定义集成代码,您可以直接使用Kafka Connect导入或导出数据。
  Kafka Connect是Kafka包含的工具,用于将数据导入和导出到Kafka。它是一种运行连接器的可扩展工具 ,它实现与外部系统交互的自定义逻辑。在这个快速启动中,我们将看到如何使用Kafka Connect实现从文件导入数据到Kafka主题与将数据从Kafka主题导出到文件。
  首先,我们将首先创建一些数据供测试用:
> echo -e "foo\nbar" > test.txt  接下来,我们将启动以独立模式运行的两个连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个是Kafka Connect进程的配置,包含常见配置,如连接的Kafka经纪人和数据的序列化格式。其余的配置文件都指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties  Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置,并创建两个连接器:第一个是源接口,它从输入文件读取行,并生成每个到Kafka主题,第二个是接收器连接器它从Kafka主题读取消息,并将其作为输出文件中的一行生成。
  在启动期间,您将看到一些日志消息,其中包括一些表示连接器正在实例化的消息。一旦Kafka Connect进程开始,源连接器应该开始读取线路test.txt并将其生成到主题connect-test,并且接头连接器应该开始从主题读取消息并将其connect-test 写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否通过整个流水线传送:
> cat test.sink.txt  
foo
  
bar
  注意,这些数据存储在Kafka主题中connect-test,因此我们还可以运行consumer控制台来查看主题中的数据(或使用自定义消费者代码来处理它):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning  
{"schema":{"type":"string","optional":false},"payload":"foo"}
  
{"schema":{"type":"string","optional":false},"payload":"bar"}
  
...
  连接器继续处理数据,因此我们可以将数据添加到文件中,并通过管道移动:
> echo "Another line" >> test.txt  您应该在consumer控制台输出看到显示信息和接收器文件中。
  Step 8: 使用Kafka Streams处理数据
  Kafka Streams是Kafka的客户端库,用于实时流处理和分析存储在Kafka broker中的数据。此快速入门示例将演示如何运行在此库中编码的流应用程序。这是WordCountDemo示例代码的要点(转换为使用Java 8 lambda表达式以便于阅读)。
// Serializers/deserializers (serde) for String and Long types  
final Serde stringSerde = Serdes.String();
  
final Serde longSerde = Serdes.Long();
  
// Construct a `KStream` from the input topic ""streams-file-input", where message values
  
// represent lines of text (for the sake of this example, we ignore whatever may be stored
  
// in the message keys).
  
KStream textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
  
KTable wordCounts = textLines
  
    // Split each text line, by whitespace, into words.
  
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  
    // Group the text words as message keys
  
    .groupBy((key, value) -> value)
  
    // Count the occurrences of each word (message key).
  
    .count("Counts")
  
// Store the running counts as a changelog stream to the output topic.
  
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
  它实现了WordCount算法,该算法从输入文本中计算单词出现直方图。但是,与以前在有限数据上操作的其他WordCount示例不同,WordCount演示应用程序的行为略有不同,因为它被设计为在无限的***数据流上运行。与有界变体类似,它是一种有状态的算法,可以跟踪和更新单词的计数。然而,由于它必须承担潜在的无限制的输入数据,所以它会周期性地输出其当前状态和结果,同时继续处理更多的数据,因为它不知道何时处理了“全部”输入数据。
  作为第一步,我们将为Kafka主题准备输入数据,随后由Kafka Streams应用程序处理。
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt  接下来,我们使用producer控制台将此输入数据发送到名为streams-file-input的输入主题,该控制台生产者逐行读取标准输入中的数据,并将每行作为单独的Kafka消息以空键和值编码字符串的主题(实际上,流数据可能会连续流入Kafka应用程序将启动和运行):
> bin/kafka-topics.sh --create \  
            --zookeeper localhost:2181 \
  
            --replication-factor 1 \
  
            --partitions 1 \
  
            --topic streams-file-input
  
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
  现在我们可以运行WordCount演示应用程序来处理输入数据:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo  演示应用程序将从输入主题streams-file-input读取,对每个读取的消息执行WordCount算法的计算,并将其当前结果持续写入输出主题streams-wordcount-output。因此,除了日志条目之外,不会有任何标准输出,因为结果被写回Kafka。该演示将运行几秒钟,然后,与典型的流处理应用程序不同,它会自动终止。
  我们现在可以通过从其输出主题中读取来检查WordCount演示应用程序的输出:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \  
            --topic streams-wordcount-output \
  
            --from-beginning \
  
            --formatter kafka.tools.DefaultMessageFormatter \
  
            --property print.key=true \
  
            --property print.value=true \
  
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
  
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
  以下输出数据输出到控制台:
all     1  
lead    1
  
to      1
  
hello   1
  
streams 2
  
join    1
  
kafka   3
  
summit  1
  这里,第一列是java.lang.String格式的Kafka消息键,第二列是java.lang.Long格式的消息值。请注意,输出实际上是一个连续的更新流,其中每个数据记录(即上面的原始输出中的每一行)是单个字的更新计数,也称为“kafka”的记录键。对于具有相同键的多个记录,每个后续记录是上一个记录的更新。
  下面的两幅图表说明了幕后实际发生的情况。第一列显示了当前状态的KTable演进,即计算出现的字数count。第二列显示从状态更新到KTable而导致的更改记录,并将其发送到输出Kafka主题streams-wordcount-output。
  首先,正在处理文本行“所有流流向Kafka”。将KTable被建成为一个新的表项(以绿色背景高亮显示),每一个新词结果,并有相应的变化记录发送到下游KStream。
  当处理第二条文本行“hello kafka streams”时,我们首次观察到KTable正在更新的现有条目(此处为“kafka”和“streams”)。此外,更改记录正在发送到输出主题。
  等等(我们跳过第三行如何处理的图示)。这就解释了为什么输出主题具有我们上面显示的内容,因为它包含完整的更新记录。
  超越这个具体例子的范围,Kafka Streams在这里做的是利用表和changelog流之间的二元性(这里:table = KTable,changelog stream =下游的KStream):您可以发布表更新到流,如果您从头到尾消耗整个changelog流,则可以重建表的内容。
  现在,您可以将更多的输入消息写入streams-file-input主题,并观察添加到streams-wordcount-output主题的更多消息,反映更新的字数(例如,如上所述使用producer控制台和consumer控制台)。
  您可以通过Ctrl-C停止consumer控制台。
DSC0000.png DSC0001.png




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669932-1-1.html 上篇帖子: DataPipeline联合Confluent Kafka Meetup上海站 下篇帖子: kafka 一些基本知识
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表