zhouu 发表于 2019-1-31 06:49:02

第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

/**  
* RDD storing the keyed states of `mapWithState` operation and corresponding mapped data.
  
* Each partition of this RDD has a single record of type `MapWithStateRDDRecord`. This contains a
  
* `StateMap` (containing the keyed-states) and the sequence of records returned by the mapping
  
* function of`mapWithState`.
  
* @param prevStateRDD The previous MapWithStateRDD on whose StateMap data `this` RDD
  
*                  will be created
  
* @param partitionedDataRDD The partitioned data RDD which is used update the previous StateMaps
  
*                           in the `prevStateRDD` to create `this` RDD
  
* @param mappingFunctionThe function that will be used to update state and return new data
  
* @param batchTime      The time of the batch to which this RDD belongs to. Use to update
  
* @param timeoutThresholdTime The time to indicate which keys are timeout
  
*/private class MapWithStateRDD(    private var prevStateRDD: RDD],    private var partitionedDataRDD: RDD[(K, V)],
  
    mappingFunction: (Time, K, Option, State) => Option,
  
    batchTime: Time,
  
    timeoutThresholdTime: Option
  
) extends RDD](
  
    partitionedDataRDD.sparkContext,
  
    List(      new OneToOneDependency](prevStateRDD),      new OneToOneDependency(partitionedDataRDD))
  
) {
  

  
@volatile private var doFullScan = false
  

  
require(prevStateRDD.partitioner.nonEmpty)
  
require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)override val partitioner = prevStateRDD.partitioneroverride def checkpoint(): Unit = {
  
    super.checkpoint()
  
    doFullScan = true
  
}override def compute(
  
      partition: Partition, context: TaskContext): Iterator] = {    val stateRDDPartition = partition.asInstanceOf    val prevStateRDDIterator = prevStateRDD.iterator(
  
      stateRDDPartition.previousSessionRDDPartition, context)    val dataIterator = partitionedDataRDD.iterator(
  
      stateRDDPartition.partitionedDataRDDPartition, context)    val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None    val newRecord = MapWithStateRDDRecord.updateRecordWithData(
  
      prevRecord,
  
      dataIterator,
  
      mappingFunction,
  
      batchTime,
  
      timeoutThresholdTime,
  
      removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
  
    )
  
    Iterator(newRecord)
  
}


页: [1]
查看完整版本: 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密