val ssc:StreamingContext=???// ignore for now
val kafkaParams:Map[String,String]=Map("group.id"->"terran",/* ignore rest */)
val numInputDStreams =5
val kafkaDStreams =(1 to numInputDStreams).map {_=>KafkaUtils.createStream(...)}
在这个例子中,我们建立了5个input DStreams,因此从Kafka中读取的工作将分担到5个核心上,寄希望于5个主机/NICs(之所以说是寄希望于,因为我也不确定Spark Streaming task布局策略是否会将receivers投放到多个主机上)。所有Input Streams都是“terran”消费者群的一部分,而Kafka将保证topic的所有数据可以同时对这5个input DSreams可用。换句话说,这种“collaborating”input DStreams设置可以工作是基于消费者群的行为是由Kafka API提供,通过KafkaInputDStream完成。
在这个例子中,我没有提到每个input DSream会建立多少个线程。在这里,线程的数量可以通过KafkaUtils.createStream方法的参数设置(同时,input topic的数量也可以通过这个方法的参数指定)。在下一节中,我们将通过实际操作展示。
但是在开始之前,在这个步骤我先解释几个Spark Streaming中常见的几个问题,其中有些因为当下Spark中存在的一些限制引起,另一方面则是由于当下Kafka input DSreams的一些设置造成:
当你使用我上文介绍的多输入流途径,而这些消费者都是属于同一个消费者群,它们会给消费者指定负责的分区。这样一来则可能导致syncpartitionrebalance的失败,系统中真正工作的消费者可能只会有几个。为了解决这个问题,你可以把再均衡尝试设置的非常高,从而获得它的帮助。然后,你将会碰到另一个坑——如果你的receiver宕机(OOM,亦或是硬件故障),你将停止从Kafka接收消息。
Spark用户讨论 markmail.org/message/…
这里,我们需要对“停止从Kafka中接收”问题 做一些解释 。当下,当你通过ssc.start()开启你的streams应用程序后,处理会开始并一直进行,即使是输入数据源(比如Kafka)变得不可用。也就是说,流不能检测出是否与上游数据源失去链接,因此也不会对丢失做出任何反应,举个例子来说也就是重连或者结束执行。类似的,如果你丢失这个数据源的一个receiver,那么 你的流应用程序可能就会生成一些空的RDDs 。
这是一个非常糟糕的情况。最简单也是最粗糙的方法就是,在与上游数据源断开连接或者一个receiver失败时,重启你的流应用程序。但是,这种解决方案可能并不会产生实际效果,即使你的应用程序需要将Kafka配置选项auto.offset.reset设置到最小——因为Spark Streaming中一些已知的bug,可能导致你的流应用程序发生一些你意想不到的问题,在下文Spark Streaming中常见问题一节我们将详细的进行介绍。
选择2:控制每个input DStream上小发着线程的数量
在这个例子中,我们将建立一个单一的input DStream,它将运行3个消费者线程——在同一个receiver/task,因此是在同一个core/machine/NIC上对Kafka topic “zerg.hydra”进行读取。
val ssc:StreamingContext=???// ignore for now
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)
val consumerThreadsPerInputDstream =3
val topics =Map("zerg.hydra"-> consumerThreadsPerInputDstream)
val stream =KafkaUtils.createStream(ssc, kafkaParams, topics,...)
KafkaUtils.createStream方法被重载,因此这里有一些不同方法的特征。在这里,我们会选择Scala派生以获得最佳的控制。
结合选项1和选项2
下面是一个更完整的示例,结合了上述两种技术:
val ssc:StreamingContext=???
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)
val numDStreams =5
val topics =Map("zerg.hydra"->1)
val kafkaDStreams =(1 to numDStreams).map{_ =>KafkaUtils.createStream(ssc, kafkaParams, topics,...)}
我们建立了5个input DStreams,它们每个都会运行一个消费者线程。如果“zerg.hydra”topic拥有5个分区(或者更少),那么这将是进行并行读取的最佳途径,如果你在意系统最大吞吐量的话。
Spark Streaming中的并行Downstream处理
在之前的章节中,我们覆盖了从Kafka的并行化读取,那么我们就可以在Spark中进行并行化处理。那么这里,你必须弄清楚Spark本身是如何进行并行化处理的。类似Kafka,Spark将parallelism设置的与(RDD)分区数量有关, 通过在每个RDD分区上运行task进行 。在有些文档中,分区仍然被称为“slices”。
在任何Spark应用程序中,一旦某个Spark Streaming应用程序接收到输入数据,其他处理都与非streaming应用程序相同。也就是说,与普通的Spark数据流应用程序一样,在Spark Streaming应用程序中,你将使用相同的工具和模式。更多详情可见Level of Parallelism in Data Processing 文档。
因此,我们同样将获得两个控制手段: 1. input DStreams的数量 ,也就是说,我们在之前章节中read parallelism的数量作为结果。这是我们的立足点,这样一来,我们在下一个步骤中既可以保持原样,也可以进行修改。 2. DStream转化的重分配 。这里将获得一个全新的DStream,其parallelism等级可能增加、减少,或者保持原样。在DStream中每个返回的RDD都有指定的N个分区。DStream由一系列的RDD组成,DStream.repartition则是通过RDD.repartition实现。接下来将对RDD中的所有数据做随机的reshuffles,然后建立或多或少的分区,并进行平衡。同时,数据会在所有网络中进行shuffles。换句话说,DStream.repartition非常类似Storm中的shuffle grouping。
因此,repartition是从processing parallelism解耦read parallelism的主要途径。在这里,我们可以设置processing tasks的数量,也就是说设置处理过程中所有core的数量。间接上,我们同样设置了投入machines/NICs的数量。
一个DStream转换相关是 union 。这个方法同样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。通常情况下,你更愿意用StreamingContext的派生。一个union将返回一个由Union RDD支撑的UnionDStream。Union RDD由RDDs统一后的所有分区组成,也就是说,如果10个分区都联合了3个RDDs,那么你的联合RDD实例将包含30个分区。换句话说,union会将多个 DStreams压缩到一个 DStreams或者RDD中,但是需要注意的是,这里的parallelism并不会发生改变。你是否使用union依赖于你的用例是否需要从所有Kafka分区进行“in one place”信息获取决定,因此这里大部分都是基于语义需求决定。举个例子,当你需要执行一个不用元素上的(全局)计数。 注意: RDDs是无序的。因此,当你union RDDs时,那么结果RDD同样不会拥有一个很好的序列。如果你需要在RDD中进行sort。
你的用例将决定需要使用的方法,以及你需要使用哪个。如果你的用例是CPU密集型的,你希望对zerg.hydra topic进行5 read parallelism读取。也就是说,每个消费者进程使用5个receiver,但是却可以将processing parallelism提升到20。
val ssc:StreamingContext=???
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)
val readParallelism =5
val topics =Map("zerg.hydra"->1)
val kafkaDStreams =(1 to readParallelism).map{ _ =>KafkaUtils.createStream(ssc, kafkaParams, topics,...)}//> collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream = ssc.union(kafkaDStreams)// often unnecessary, just showcasing how to do it//> single DStream
val processingParallelism =20
val processingDStream = unionDStream(processingParallelism)//> single DStream but now with 20 partitions
在下一节中,我将把所有部分结合到一起,并且联合实际数据处理进行讲解。
写入到Kafka
写入到Kafka需要从foreachRDD输出操作进行:
通用的输出操作者都包含了一个功能(函数),让每个RDD都由Stream生成。这个函数需要将每个RDD中的数据推送到一个外部系统,比如将RDD保存到文件,或者通过网络将它写入到一个数据库。需要注意的是,这里的功能函数将在驱动中执行,同时其中通常会伴随RDD行为,它将会促使流RDDs的计算。 注意: 重提“功能函数是在驱动中执行”,也就是Kafka生产者将从驱动中进行,也就是说“功能函数是在驱动中进行评估”。当你使用foreachRDD从驱动中读取Design Patterns时,实际过程将变得更加清晰。
在这里,建议大家去阅读Spark文档中的 Design Patterns for using foreachRDD一节,它将详细讲解使用foreachRDD读外部系统中的一些常用推荐模式,以及经常出现的一些陷阱。
在我们这个例子里,我们将按照推荐来重用Kafka生产者实例,通过生产者池跨多个RDDs/batches。 我通过 Apache Commons Pool 实现了这样一个工具,已经上传到GitHub 。这个生产者池本身通过 broadcast variable 提供给tasks。
最终结果看起来如下:
val producerPool ={// See the full code on GitHub for details on how the pool is created
val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
ssc.sparkContext.broadcast(pool)}
stream.map {...}.foreachRDD(rdd =>{
rdd.foreachPartition(partitionOfRecords =>{// Get a producer from the shared pool
val p = producerPool.value.borrowObject()
partitionOfRecords.foreach{case tweet:Tweet=>// Convert pojo back into Avro binary format
val bytes = converter.value.apply(tweet)// Send the bytes to Kafka
p.send(bytes)}// Returning the producer to the pool also shuts it down
producerPool.value.returnObject(p)})})
需要注意的是, Spark Streaming每分钟都会建立多个RDDs,每个都会包含多个分区,因此你无需为Kafka生产者实例建立新的Kafka生产者,更不用说每个Kafka消息。上面的步骤将最小化Kafka生产者实例的建立数量,同时也会最小化TCP连接的数量(通常由Kafka集群确定)。你可以使用这个池设置来精确地控制对流应用程序可用的Kafka生产者实例数量。如果存在疑惑,尽量用更少的。
// Set up the input DStream to read from Kafka (in parallel)
val kafkaStream ={
val sparkStreamingConsumerGroup ="spark-streaming-consumer-group"
val kafkaParams =Map("zookeeper.connect"->"zookeeper1:2181","group.id"->"spark-streaming-test","zookeeper.connection.timeout.ms"->"1000")
val inputTopic ="input-topic"
val numPartitionsOfInputTopic =5
val streams =(1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams,Map(inputTopic ->1),StorageLevel.MEMORY_ONLY_SER).map(_._2)}
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism =1// You'd probably pick a higher value than 1 in production.
unifiedStream.repartition(sparkProcessingParallelism)}// We use accumulators to track global "counters" across the tasks of our streaming app
val numInputMessages = ssc.sparkContext.accumulator(0L,"Kafka messages consumed")
val numOutputMessages = ssc.sparkContext.accumulator(0L,"Kafka messages produced")// We use a broadcast variable to share a pool of Kafka producers, which we use to write data from Spark to Kafka.
val producerPool ={
val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
ssc.sparkContext.broadcast(pool)}// We also use a broadcast variable for our Avro Injection (Twitter Bijection)
val converter = ssc.sparkContext.broadcast(SpecificAvroCodecs.toBinary[Tweet])// Define the actual data flow of the streaming job
kafkaStream.map {case bytes =>
numInputMessages +=1// Convert Avro binary data to pojo
converter.value.invert(bytes) match {caseSuccess(tweet)=> tweet
caseFailure(e)=>// ignore if the conversion failed}}.foreachRDD(rdd =>{
rdd.foreachPartition(partitionOfRecords =>{
val p = producerPool.value.borrowObject()
partitionOfRecords.foreach{case tweet:Tweet=>// Convert pojo back into Avro binary format
val bytes = converter.value.apply(tweet)// Send the bytes to Kafka
p.send(bytes)
numOutputMessages +=1}
producerPool.value.returnObject(p)})})// Run the streaming job
ssc.start()
ssc.awaitTermination()
更多的细节和解释可以在这里看所有源代码。
就我自己而言,我非常喜欢 Spark Streaming代码的简洁和表述。在Bobby Evans和 Tom Graves讲话中没有提到的是,Storm中这个功能的等价代码是非常繁琐和低等级的: kafka-storm-starter 中的 KafkaStormSpec 会运行一个Stormtopology来执行相同的计算。同时,规范文件本身只有非常少的代码,当然是除下说明语言,它们能更好的帮助理解;同时,需要注意的是,在Storm的Java API中,你不能使用上文Spark Streaming 示例中所使用的匿名函数,比如map和foreach步骤。取而代之的是,你必须编写完整的类来获得相同的功能,你可以查看 AvroDecoderBolt 。这感觉是将Spark的API转换到Java,在这里使用匿名函数是非常痛苦的。
最后,我同样也非常喜欢 Spark的说明文档 ,它非常适合初学者查看,甚至还包含了一些 进阶使用 。关于Kafka整合到Spark,上文已经基本介绍完成,但是我们仍然需要浏览mailing list和深挖源代码。这里,我不得不说,维护帮助文档的同学做的实在是太棒了。
知晓Spark Streaming中的一些已知问题
你可能已经发现在Spark中仍然有一些尚未解决的问题,下面我描述一些我的发现:
一方面,在对Kafka进行读写上仍然存在一些含糊不清的问题,你可以在类似Multiple Kafka Receivers and Union 和 How to scale more consumer to Kafka stream mailing list的讨论中发现。
另一方面,Spark Streaming中一些问题是因为Spark本身的固有问题导致,特别是故障发生时的数据丢失问题。换句话说,这些问题让你不想在生产环境中使用Spark。
// Required to gain access to RDD transformations via implicits.import org.apache.spark.SparkContext._
// Required when working on `PairDStreams` to gain access to e.g. `DStream.reduceByKey`// (versus `DStream.transform(rddBatch => rddBatch.reduceByKey()`) via implicits.//// See also http://spark.apache.org/docs/1.1.0/programming-guide.html#working-with-key-value-pairsimport org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
如果你是 Twitter Algebird的爱好者,你将会喜欢使用Count-Min Sketch和Spark中的一些特性,代表性的,你会使用reduce或者reduceByWindow这样的操作(比如,DStreams上的转换 )。Spark项目包含了 Count-Min Sketch 和 HyperLogLog 的示例介绍。
如果你需要确定Algebird数据结构的内存介绍,比如Count-Min Sketch、HyperLogLog或者Bloom Filters,你可以使用SparkContext日志进行查看,更多细节参见 Determining Memory Consumption 。
Kafka整合
我前文所述的一些增补: