Streaming 与kafka updateStateBykey()
object H extends App{valconf=newSparkConf().setMaster("local").setAppName("hello")
val ss=new StreamingContext(conf,Seconds(5))
val kafkaParams=Map("metadata.broker.list"->"myhadoop1:9092")
ss.checkpoint("hdfs://myhadoop1:8020/data")
val topic=Set("wordcount1")
//kafka
val lines=KafkaUtils.createDirectStream(ss,kafkaParams,topic)
lines.flatMap(_._2.split(" ")).map((_,1)).updateStateByKey((seqs:Seq,option:Option)=>{
var oldValue=option.getOrElse(0)
for(seq
页:
[1]