第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
/*** RDD storing the keyed states of `mapWithState` operation and corresponding mapped data.
* Each partition of this RDD has a single record of type `MapWithStateRDDRecord`. This contains a
* `StateMap` (containing the keyed-states) and the sequence of records returned by the mapping
* function of`mapWithState`.
* @param prevStateRDD The previous MapWithStateRDD on whose StateMap data `this` RDD
* will be created
* @param partitionedDataRDD The partitioned data RDD which is used update the previous StateMaps
* in the `prevStateRDD` to create `this` RDD
* @param mappingFunctionThe function that will be used to update state and return new data
* @param batchTime The time of the batch to which this RDD belongs to. Use to update
* @param timeoutThresholdTime The time to indicate which keys are timeout
*/private class MapWithStateRDD( private var prevStateRDD: RDD], private var partitionedDataRDD: RDD[(K, V)],
mappingFunction: (Time, K, Option, State) => Option,
batchTime: Time,
timeoutThresholdTime: Option
) extends RDD](
partitionedDataRDD.sparkContext,
List( new OneToOneDependency](prevStateRDD), new OneToOneDependency(partitionedDataRDD))
) {
@volatile private var doFullScan = false
require(prevStateRDD.partitioner.nonEmpty)
require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)override val partitioner = prevStateRDD.partitioneroverride def checkpoint(): Unit = {
super.checkpoint()
doFullScan = true
}override def compute(
partition: Partition, context: TaskContext): Iterator] = { val stateRDDPartition = partition.asInstanceOf val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context) val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context) val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)
}
页:
[1]