hb_sz 发表于 2019-1-30 14:16:41

Spark操作—aggregate、aggregateByKey详解

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext
  

  
object AggregateByKeyOp {
  
def main(args:Array){
  
   val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")
  
    val sc: SparkContext = new SparkContext(sparkConf)
  

  
   val data=List((1,3),(1,2),(1,4),(2,3))
  
   val rdd=sc.parallelize(data, 2)
  

  
   //合并不同partition中的值,a,b得数据类型为zeroValue的数据类型
  
   def combOp(a:String,b:String):String={
  
       println("combOp: "+a+"\t"+b)
  
       a+b
  
   }
  
   //合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
  
      def seqOp(a:String,b:Int):String={
  
      println("SeqOp:"+a+"\t"+b)
  
      a+b
  
      }
  
      rdd.foreach(println)
  
      //zeroValue:中立值,定义返回value的类型,并参与运算
  
      //seqOp:用来在同一个partition中合并值
  
      //combOp:用来在不同partiton中合并值
  
      val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)
  
      sc.stop()
  
}
  
}
  
运行结果:
  
将数据拆分成两个分区
  

  
//分区一数据
  
(1,3)
  
(1,2)
  
//分区二数据
  
(1,4)
  
(2,3)
  

  
//分区一相同key的数据进行合并
  
seq: 100   3   //(1,3)开始和中立值进行合并合并结果为 1003
  
seq: 1003   2   //(1,2)再次合并 结果为 10032
  

  
//分区二相同key的数据进行合并
  
seq: 100   4//(1,4) 开始和中立值进行合并 1004
  
seq: 100   3//(2,3) 开始和中立值进行合并 1003
  

  
将两个分区的结果进行合并
  
//key为2的,只在一个分区存在,不需要合并 (2,1003)(2,1003)//key为1的, 在两个分区存在,并且数据类型一致,合并
  
comb: 10032   1004(1,100321004)


页: [1]
查看完整版本: Spark操作—aggregate、aggregateByKey详解