hc6538 发表于 2019-1-31 10:04:17

Streaming 与kafka updateStateBykey()

object H extends App{
      valconf=newSparkConf().setMaster("local").setAppName("hello")
      val ss=new StreamingContext(conf,Seconds(5))
      val kafkaParams=Map("metadata.broker.list"->"myhadoop1:9092")
      ss.checkpoint("hdfs://myhadoop1:8020/data")
      val topic=Set("wordcount1")
      //kafka
      val lines=KafkaUtils.createDirectStream(ss,kafkaParams,topic)
      lines.flatMap(_._2.split(" ")).map((_,1)).updateStateByKey((seqs:Seq,option:Option)=>{
                var oldValue=option.getOrElse(0)
                for(seq
页: [1]
查看完整版本: Streaming 与kafka updateStateBykey()