|
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AggregateByKeyOp {
def main(args:Array[String]){
val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")
val sc: SparkContext = new SparkContext(sparkConf)
val data=List((1,3),(1,2),(1,4),(2,3))
val rdd=sc.parallelize(data, 2)
//合并不同partition中的值,a,b得数据类型为zeroValue的数据类型
def combOp(a:String,b:String):String={
println("combOp: "+a+"\t"+b)
a+b
}
//合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
def seqOp(a:String,b:Int):String={
println("SeqOp:"+a+"\t"+b)
a+b
}
rdd.foreach(println)
//zeroValue:中立值,定义返回value的类型,并参与运算
//seqOp:用来在同一个partition中合并值
//combOp:用来在不同partiton中合并值
val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)
sc.stop()
}
}
运行结果:
将数据拆分成两个分区
//分区一数据
(1,3)
(1,2)
//分区二数据
(1,4)
(2,3)
//分区一相同key的数据进行合并
seq: 100 3 //(1,3)开始和中立值进行合并 合并结果为 1003
seq: 1003 2 //(1,2)再次合并 结果为 10032
//分区二相同key的数据进行合并
seq: 100 4 //(1,4) 开始和中立值进行合并 1004
seq: 100 3 //(2,3) 开始和中立值进行合并 1003
将两个分区的结果进行合并
//key为2的,只在一个分区存在,不需要合并 (2,1003)(2,1003)//key为1的, 在两个分区存在,并且数据类型一致,合并
comb: 10032 1004(1,100321004)
|
|
|