设为首页 收藏本站
查看: 1014|回复: 0

[经验分享] spark2.x由浅入深深到底系列六之RDD java api详解三

[复制链接]

尚未签到

发表于 2019-1-30 14:12:05 | 显示全部楼层 |阅读模式
  学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark
  

  本文详细介绍了spark key-value类型的rdd java api
  

  一、key-value类型的RDD的创建方式
  1、sparkContext.parallelizePairs

JavaPairRDD javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
//结果:[(test,3), (kkk,3)]
System.out.println("javaPairRDD = " + javaPairRDD.collect());  2、keyBy的方式
public class User implements Serializable {
    private String userId;
    private Integer amount;
    public User(String userId, Integer amount) {
        this.userId = userId;
        this.amount = amount;
    }
    @Override
    public String toString() {
        return "User{" +
                "userId='" + userId + '\'' +
                ", amount=" + amount +
                '}';
    }
}
JavaRDD userJavaRDD = sc.parallelize(Arrays.asList(new User("u1", 20)));
JavaPairRDD userJavaPairRDD = userJavaRDD.keyBy(new Function() {
    @Override
    public String call(User user) throws Exception {
        return user.getUserId();
    }
});
//结果:[(u1,User{userId='u1', amount=20})]
System.out.println("userJavaPairRDD = " + userJavaPairRDD.collect());  3、zip的方式
JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
//两个rdd zip也是创建key-value类型RDD的一种方式
JavaPairRDD zipPairRDD = rdd.zip(rdd);
//结果:[(1,1), (1,1), (2,2), (3,3), (5,5), (8,8), (13,13)]
System.out.println("zipPairRDD = " + zipPairRDD.collect());  4、groupBy的方式
JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function isEven = new Function() {
    @Override
    public Boolean call(Integer x) throws Exception {
        return x % 2 == 0;
    }
};
//将偶数和奇数分组,生成key-value类型的RDD
JavaPairRDD oddsAndEvens = rdd.groupBy(isEven);
//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]
System.out.println("oddsAndEvens = " + oddsAndEvens.collect());
//结果:1
System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());
oddsAndEvens = rdd.groupBy(isEven, 2);
//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]
System.out.println("oddsAndEvens = " + oddsAndEvens.collect());
//结果:2
System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());  二、combineByKey
JavaPairRDD javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),
                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);
//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数
Function createCombiner = new Function() {
    @Override
    public Tuple2 call(Integer value) throws Exception {
        return new Tuple2(value, 1);
    }
};
//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数
Function2 mergeValue =
        new Function2() {
            @Override
            public Tuple2 call(Tuple2 acc, Integer value) throws Exception {
                return new Tuple2(acc._1() + value, acc._2() + 1);
            }
        };
//当需要对不同分区的数据进行聚合的时候应用这个函数
Function2 mergeCombiners =
        new Function2() {
            @Override
            public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception {
                return new Tuple2(acc1._1() + acc2._1(), acc1._2() + acc2._2());
            }
        };
JavaPairRDD combineByKeyRDD =
        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());  combineByKey的数据流如下:

  对于combineByKey的原理讲解详细见: spark core RDD api原理详解
  三、aggregateByKey
JavaPairRDD aggregateByKeyRDD =
        javaPairRDD.aggregateByKey(new Tuple2(0, 0), mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("aggregateByKeyRDD = " + aggregateByKeyRDD.collect());
//aggregateByKey是由combineByKey实现的,上面的aggregateByKey就是等于下面的combineByKeyRDD
Function createCombinerAggregateByKey =
        new Function() {
            @Override
            public Tuple2 call(Integer value) throws Exception {
                return mergeValue.call(new Tuple2(0, 0), value);
            }
        };
//结果是: [(coffee,(12,3)), (panda,(3,1))]
System.out.println(javaPairRDD.combineByKey(createCombinerAggregateByKey, mergeValue, mergeCombiners).collect());  四、reduceByKey
JavaPairRDD reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2() {
    @Override
    public Integer call(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
});
//结果:[(coffee,12), (panda,3)]
System.out.println("reduceByKeyRDD = " + reduceByKeyRDD.collect());
//reduceByKey底层也是combineByKey实现的,上面的reduceByKey等于下面的combineByKey
Function createCombinerReduce = new Function() {
    @Override
    public Integer call(Integer integer) throws Exception {
        return integer;
    }
};
Function2 mergeValueReduce =
        new Function2() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        };
//结果:[(coffee,12), (panda,3)]
System.out.println(javaPairRDD.combineByKey(createCombinerReduce, mergeValueReduce, mergeValueReduce).collect());  五、foldByKey
JavaPairRDD foldByKeyRDD = javaPairRDD.foldByKey(0, new Function2() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
});
//结果:[(coffee,12), (panda,3)]
System.out.println("foldByKeyRDD = " + foldByKeyRDD.collect());
//foldByKey底层也是combineByKey实现的,上面的foldByKey等于下面的combineByKey
Function2 mergeValueFold =
        new Function2() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        };
Function createCombinerFold = new Function() {
    @Override
    public Integer call(Integer integer) throws Exception {
        return mergeValueFold.call(0, integer);
    }
};
//结果:[(coffee,12), (panda,3)]
System.out.println(javaPairRDD.combineByKey(createCombinerFold, mergeValueFold, mergeValueFold).collect());  六、groupByKey
JavaPairRDD groupByKeyRDD = javaPairRDD.groupByKey();
//结果:[(coffee,[1, 2, 9]), (panda,[3])]
System.out.println("groupByKeyRDD = " + groupByKeyRDD.collect());
//groupByKey底层也是combineByKey实现的,上面的groupByKey等于下面的combineByKey
Function createCombinerGroup = new Function() {
    @Override
    public List call(Integer integer) throws Exception {
        List list = new ArrayList();
        list.add(integer);
        return list;
    }
};
Function2 mergeValueGroup = new Function2() {
    @Override
    public List call(List integers, Integer integer) throws Exception {
        integers.add(integer);
        return integers;
    }
};
Function2 mergeCombinersGroup =
        new Function2() {
            @Override
            public List call(List integers, List integers2) throws Exception {
                integers.addAll(integers2);
                return integers;
            }
        };
//结果:[(coffee,[1, 2, 9]), (panda,[3])]
System.out.println(javaPairRDD.combineByKey(createCombinerGroup, mergeValueGroup, mergeCombinersGroup).collect());  

  对于api原理性的东西很难用文档说明清楚,如果想更深入,更透彻的理解api的原理,可以参考: spark core RDD api原理详解




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669700-1-1.html 上篇帖子: Spark RDDs vs DataFrames vs SparkSQL-13101614 下篇帖子: spark资源网址02-BigData
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表