package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object SparkStreamingKakfaWordCount {
def main(args: Array[String]) {
println("Start to run SparkStreamingKakfaWordCount")
val conf = new SparkConf().setAppName("SparkStreamingKakfaWordCount")
val ssc = new StreamingContext(conf, Seconds(3))
val topicMap = "test".split(":").map((_, 1)).toMap
//zookeeper quorums server list
val zkQuorum = "localhost:2181";
//consumer group
val group = "test-consumer-group"
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.print()
//使用如下语句,则print显示在console上是一堆空白的();使用上面的语句则可以正常显示每行文本
// lines.map(line => {
// println(line)
// }).print
/**
* Create an input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def createStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, storageLevel)
}
3.3 为什么下面代码会打印()