spark2.x由浅入深深到底系列六之RDD java api详解四
学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark本文对join相关的api做了一个解释
SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD javaPairRDD =
sc.parallelizePairs(Arrays.asList(new Tuple2(1, 2),
new Tuple2(3, 4), new Tuple2(3, 6), new Tuple2(5, 6)));
JavaPairRDD otherJavaPairRDD =
sc.parallelizePairs(Arrays.asList(new Tuple2(3, 9),
new Tuple2(4, 5)));
//结果: [(4,([],)), (1,(,[])), (3,(,)), (5,(,[]))]
System.out.println(javaPairRDD.cogroup(otherJavaPairRDD).collect());
//结果: [(4,([],)), (1,(,[])), (3,(,)), (5,(,[]))]
// groupWith和cogroup效果是一模一样的
System.out.println(javaPairRDD.groupWith(otherJavaPairRDD).collect());
//结果: [(3,(4,9)), (3,(6,9))]
//基于cogroup实现的,就是取cogroup结果中相同key在两个RDD都有value的数据
System.out.println(javaPairRDD.join(otherJavaPairRDD).collect());
//结果: [(1,(2,Optional.empty)), (3,(4,Optional)), (3,(6,Optional)), (5,(6,Optional.empty))]
//基于cogroup实现的,结果需要出现的key以左边的RDD为准
System.out.println(javaPairRDD.leftOuterJoin(otherJavaPairRDD).collect());
//结果: [(4,(Optional.empty,5)), (3,(Optional,9)), (3,(Optional,9))]
//基于cogroup实现的,结果需要出现的key以右边的RDD为准
System.out.println(javaPairRDD.rightOuterJoin(otherJavaPairRDD).collect());
//结果: [(4,(Optional.empty,Optional)), (1,(Optional,Optional.empty)), (3,(Optional,Optional)), (3,(Optional,Optional)), (5,(Optional,Optional.empty))]
//基于cogroup实现的,结果需要出现的key是两个RDD中所有的key
System.out.println(javaPairRDD.fullOuterJoin(otherJavaPairRDD).collect());
从上可以看出,最基本的操作是cogroup这个操作,下面是cougroup的原理图:
https://s5.运维网.com/wyfs02/M02/A5/A7/wKioL1nBGv2wEZjgAB0QijUwttE091.png-wh_500x0-wm_3-wmp_4-s_3307346149.png
如果想对cogroup原理更彻底的理解,可以参考:spark core RDD api原理详解
页:
[1]