设为首页 收藏本站
查看: 1476|回复: 0

[经验分享] 【Spark五十三】Spark Streaming整合Kafka

[复制链接]

尚未签到

发表于 2017-5-23 18:01:15 | 显示全部楼层 |阅读模式
1. Spark Streaming程序代码

package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object SparkStreamingKakfaWordCount {
def main(args: Array[String]) {
println("Start to run SparkStreamingKakfaWordCount")
val conf = new SparkConf().setAppName("SparkStreamingKakfaWordCount")
val ssc = new StreamingContext(conf, Seconds(3))
val topicMap = "test".split(":").map((_, 1)).toMap
//zookeeper quorums server list
val zkQuorum = "localhost:2181";
//consumer group
val group = "test-consumer-group"
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.print()
//使用如下语句,则print显示在console上是一堆空白的();使用上面的语句则可以正常显示每行文本
//    lines.map(line => {
//      println(line)
//    }).print

ssc.start()
ssc.awaitTermination()
}
}


2. 生产消息到指定的topic
  生产消息,可以有如下两种方式
  1.可以使用Kafka自带的通过console为Kakfa生产消息的命令行

./kafka-console-producer.sh --topic test --broke-list localhost:9081
  2.也可以使用如下Spark Streaming提供的example中的Kafka Message Producer

./run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5

3. 问题
  3.1. KafkaUtils.createStream中的consumerGroupId这个参数指的是consumerGroup的标识,那么什么是consumerGroup?有什么用处? 表示所有的Spark Streaming的Receiver(构建于Kafka的Consumer API之上),同属于一个Consumer Group,因此多个Receiver可以并行消费同一个Topic的不同的partition
  3.2. KafkaUtils.createStream中的topicMap这个参数是一个Map,是topicName跟一个整数的映射,topicName可以理解,那么这个对应的整数作何解?
  看下KafkaUtils.createStream的方法说明。可见每个topicName对应的整数是这个topic的分区数,即每个Receiver可以使用分区数个线程来并发的读取Topic的numPartitions个分区

  /**
* Create an input stream that pulls messages from a Kafka Broker.
* @param ssc       StreamingContext object
* @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId   The group id for this consumer
* @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
*                  in its own thread
* @param storageLevel  Storage level to use for storing the received objects
*                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def createStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, storageLevel)
}
  3.3  为什么下面代码会打印()

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.map(line => {
println(line)
}).print
  首先KafkaUtils.createStream返回的是一个ReceiverInputDStream,它包含的数据类型是二元组(String, String),因此,可以调用(_._2)得到二元组第二个元素,直接调用lines.print方法,DStream应该是把lines这个DStream中的内容打印出来。
  但是使用上面的代码,流程是,调用print,然后调用lines.map做转换,转换的结果是按说应该应该打印结果,然后返回一个空集合?
  之前在做Flume与Spark的对接时,有过类似的处理操作,它的逻辑是:

    val lines = FlumeUtils.createStream(ssc,"localhost",9999)
lines.map(evt => {
val str = new String(evt.event.getBody.array())
("string received: "+ str)
}).print()
  lines.map操作,返回的是一个("string received: " + str)字符串,因此print可以将它打印出来,如果调用println("string received: " + str),估计跟上面Kafka的结果一样

复杂的场景:
  1. Kafka伪分布式安装localhost:9092,localhost:9093,localhost:9094
  2. Topic: 6个partition,2个replica
  3.  Spark Streaming启动6个Receiver,每个Receiver1个线程读取Topic中的一个partition

package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
//假设Kafka有6个分区
//Spark Streaming创建6个Input DStream,并行读6个分区
//Spark Streaming将RDD重新分区为18个RDD,进行并行处理,处理逻辑的并行度是读取并行度的3倍
object MyKafkaWordCount {
def main(args: Array[String]) {
println("Start to run MyKafkaWordCount")
val conf = new SparkConf().setAppName("MyKafkaWordCount").setMaster("local[20]")
val ssc = new StreamingContext(conf, Seconds(3))
//
val topicMap = Map("topic-p6-r2"->1)
val zkQuorum = "localhost:2181";
val group = "topic-p6-r2-consumer-group"
//Kakfa has 6 partitions, here create 6 Input DStream
val streams = (1 to 6).map ( _ =>
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
)
///将6个streams进行union
val partitions = ssc.union(streams).repartition(18).map("DataReceived: " + _)
partitions.print()
ssc.start()
ssc.awaitTermination()
}
}

  1.提交application的代码:
  结果:
  1.Spark Streaming收到数据的顺序跟发送的顺序不一致,不一致就对了,因为Kafka保证的partittion内的数据是发送和接收的循序保证,但是不保证,partition之间的顺序性
  2.启动脚本:(因为需要6个Receiver,因此只能使用local模式,给20个worker threads)

./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 512M --total-executor-cores 2  --class spark.examples.streaming.MyKafkaWordCount Hello2.jar
  有待阅读:
  http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/?utm_source=tuicool
  http://spark.apache.org/docs/latest/streaming-kafka-integration.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379950-1-1.html 上篇帖子: Kafka: High Qulity Posts 下篇帖子: kafka之生产、消费关系
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表