woceshiyong8 发表于 2017-3-2 09:14:36

SparkStreaming updateStateByKey 保存记录信息

object SparkStreaming_StateFul {

def main(args: Array): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setMaster("local")
      .setAppName(this.getClass.getSimpleName)
      .set("spark.executor.memory", "2g")
      .set("spark.cores.max", "8")
      .setJars(Array("E:\\ScalaSpace\\Spark_Streaming\\out\\artifacts\\Spark_Streaming.jar"))
    val context = new SparkContext(conf)

    val updateFunc = (values : Seq,state : Option) => {
      val currentCount= values.foldLeft(0)(_+_)
      val previousCount = state.getOrElse(0) 查看是否存在,如果存在直接获取
      Some(currentCount + previousCount)
    }

    //step1 create streaming context
    val ssc = new StreamingContext(context,Seconds(10))
    ssc.checkpoint(".")


    //step2 create a networkInputStream on get ip:port and count the words in input stream of \n delimited text
    val lines = ssc.socketTextStream("218.193.154.79",12345)

    val data = lines.flatMap(_.split(" "))
    val wordDstream = data.map(x => (x,1))

    //使用updateStateByKey 来更新状态
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)

    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
}
}
ssc.checkPoint 如果在集群上运行会报出如下的错误:org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD at print at SparkStreaming_StateFul.scala:43(0) has different number of partitions from original RDD MapPartitionsRDD at updateStateByKey at SparkStreaming_StateFul.scala:41(2)
at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:73)
at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74)
这是因为没有将文件保存到hdfs环境中导致的





From WizNote
页: [1]
查看完整版本: SparkStreaming updateStateByKey 保存记录信息