设为首页 收藏本站
查看: 1030|回复: 0

[经验分享] 【Spark二一】Spark Streaming结合Kafka

[复制链接]

尚未签到

发表于 2017-5-23 17:16:45 | 显示全部楼层 |阅读模式
  本篇运行Spark Streaming自带的例子KafkaWorkCount,为运行这个例子,需要搭建环境,具体的讲,需要


  • 安装运行Kafka
  • 安装运行Zookeeper(因为Kafka的运行依赖于Zookeeper以注册Topic到Zookeeper上)   ---,除了安装运行独立的Zookeeper,Kafka也可以使用安装包里的Zookeeper,如果Kafka要使用自己的Zookeeper,那么需要在Kafka的bin目录下启动Zookeeper。因此,如果使用独立的Zookeeper的时候,就无需启动Kafka下面的Zookeeper了。在Kafka启动过程中看到有关Zookeeper的日志,这是Kafka作为Zookeeper的客户端正在建立与Zookeeper服务器的通讯

  • 运行Spark Streaming

安装Kafka
   1. 下载Kafka

http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
     2.10表示Scala的版本,而0.8.1.1表示Kafka的版本

  2. 解压Kafka
      惊讶的是Kafka内置了Zookeeper的安装包以及启停Zookeeper的脚本,版本比较低,是3.3.4版本。理论上不应该使用Kafka的版本,因为Zookeeper是个通用分布式配置和协调系统。

配置Kafka
  1. 修改配置文件config/server.properties
     host.name和avertised.host.name默认是注释掉的,把它打开

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=localhost
  2. 配置Zookeeper

      本文使用单独安装的Zookeeper,而不是使用Kafka自带的Zookeeper,Kafka为了能够知道它要连接的Zookeeper地址,配置文件中提供了一系列和Zookeeper相关的配置参数


  • config/server.properties

############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
//2181是Zookeeper的clientPort
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
   


  • config/producer.properties
      无相关配置


  • config/consumer.properties

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
   

启动Zookeeper
  1.根据之前Kafka对Zookeeper的配置,Zookeeper应该配置端口2181端口
  2. 使用如下命令启动Zookeeper,启动Zookeeper的参数如下:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/hadoop/software/zookeeper-3.4.6/data
# the port at which the clients will connect
clientPort=2181
 
启动Kafka
  1. 启动Kafka

[hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties
  2.启动日志:
     从启动日志中,貌似Kafka是在启动自身的Zookeeper,如果启动自身的Zookeeper也是可以理解的,因为Kafka依赖Zookeeper,只要维护好自己的这一份Zookeeper,而无需关心独立的Zookeeper。

[2015-01-11 01:11:12,490] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,558] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,558] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,558] INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,558] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,559] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,559] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,559] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,559] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,559] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,559] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,559] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,560] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,560] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,560] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,560] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2015-01-11 01:11:12,607] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2015-01-11 01:11:12,609] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2015-01-11 01:11:12,640] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:host.name=hadoop.master (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:java.version=1.7.0_67 (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:java.home=/home/hadoop/software/jdk1.7.0_67/jre (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:java.class.path=:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,640] INFO Client environment:user.dir=/home/hadoop/software/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,641] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7a50a6d4 (org.apache.zookeeper.ZooKeeper)
[2015-01-11 01:11:12,643] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2015-01-11 01:11:12,706] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2015-01-11 01:11:12,716] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2015-01-11 01:11:12,756] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x14ad79bb13d0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-01-11 01:11:12,759] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2015-01-11 01:11:12,919] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2015-01-11 01:11:12,948] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2015-01-11 01:11:12,975] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2015-01-11 01:11:13,039] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2015-01-11 01:11:13,063] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2015-01-11 01:11:13,163] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2015-01-11 01:11:13,219] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-01-11 01:11:13,367] INFO Registered broker 0 at path /brokers/ids/0 with address hadoop.master:9092. (kafka.utils.ZkUtils$)
[2015-01-11 01:11:13,379] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-01-11 01:11:13,486] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
 
Kafka测试
  1. ///创建一个Topic,取名为test

[hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test
Created topic "test".
///列出创建的Topic,这里只有一个test
[hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
   2. Producer创建消息
     启动时,除了打印SLF4J之外,没有别的。下面可以直接输入生产的数据

[hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

This is mesage
This is a test
   3. Consumer消费消息
      启动时,除了打印SLF4J之外,没有别的

[hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

This is mesage
This is a test
 
 运行Spark Stream的Kafka实例
  1. 首先关掉在测试Kafka能够正常工作而开启的Producer和Consumer
  2. 运行KafkaWordCountProducer

bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
   
  参数说明:


  • KafkaWordCountProducer producer的类名
  • localhost:9092表示Kafka服务器的IP和端口
  • test表示topic
  • 3表示每秒发多少条消息
  • 5表示每条消息中有几个单词
   3. 运行KafkaWordCount

bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
  参数说明 


  • KafkaWordCount表示消息的消费者
  • localhost:2181表示Zookeeper的客户端监听IP和端口
  • test-consumer-group表示consumer的分组,这个分组是在Kafka的config/consumer.properties中配置的,有多个分组怎么办?

#consumer group id
group.id=test-consumer-group
   


  • test表示topic
  • 1表示线程数

 总结:
  1. 同NetworkWordCount一样,这个例子运行起来后也没看到结果在那里输出,这究竟是什么地方出问题了???
  参考:http://www.cnblogs.com/hseagle/p/3887507.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379888-1-1.html 上篇帖子: Kafka启动的流程 下篇帖子: Kafka-消息中间件
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表