然后开始写spark部分的代码
首先加入kafka的maven依赖<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.2.1</version>
</dependency>
代码如下:package cn.han
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._
object MySparkkafka {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0")
val sc = new SparkContext("local[2]", "Spark Streaming kafka Integration")
//创建StreamingContext,3秒一个批次
val ssc = new StreamingContext(sc, Seconds(4))
val kafkaParams: Map[String, String] = Map("group.id" -> "terran")
val readParallelism = 5
val topics = Map("test" -> 1)
val sl=StorageLevel.MEMORY_ONLY
val kafkaStream=KafkaUtils.createStream(ssc,"192.168.5.220:2181","terran", topics, sl)
val rss2=kafkaStream.flatMap(x =>{
val by=x._2
val sb=by.split(" ")
sb
})
val rdd3=rss2.map(x=>(x,1))
val rdd4=rdd3.reduceByKey(_+_)
rdd4.print()
//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()
sc.stop()
}
}