设为首页 收藏本站
查看: 1457|回复: 0

[经验分享] spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

[复制链接]
发表于 2019-1-31 07:13:43 | 显示全部楼层 |阅读模式
  学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark

  

  我们在 http://7639240.blog.运维网.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口。接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现:
  

  一、非lambda实现的java spark wordcount程序:
public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //JavaPairRDD inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt",
        //        TextInputFormat.class, LongWritable.class, Text.class);
        JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt");
        JavaRDD wordsRDD = inputRDD.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        JavaPairRDD keyValueWordsRDD
                = wordsRDD.mapToPair(new PairFunction() {
            @Override
            public Tuple2 call(String s) throws Exception {
                return new Tuple2(s, 1);
            }
        });
        JavaPairRDD wordCountRDD =
                keyValueWordsRDD.reduceByKey(new HashPartitioner(2),
                        new Function2() {
            @Override
            public Integer call(Integer a, Integer b) throws Exception {
                return a + b;
            }
        });
        //如果输出文件存在的话需要删除掉
        File outputFile = new File("/Users/tangweiqun/wordcount");
        if (outputFile.exists()) {
            File[] files = outputFile.listFiles();
            for(File file: files) {
                file.delete();
            }
            outputFile.delete();
        }
        wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount");
        System.out.println(wordCountRDD.collect());
    }
}  

  二、java8 lambda实现的wordcount代码
public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //JavaPairRDD inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt",
        //        TextInputFormat.class, LongWritable.class, Text.class);
        JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt");
        JavaRDD wordsRDD = inputRDD.flatMap(input -> Arrays.asList(input.split(" ")).iterator());
        JavaPairRDD keyValueWordsRDD
                = wordsRDD.mapToPair(word -> new Tuple2(word, 1));
        JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey((a, b) -> a + b);
        //如果输出文件存在的话需要删除掉
        File outputFile = new File("/Users/tangweiqun/wordcount");
        if (outputFile.exists()) {
            File[] files = outputFile.listFiles();
            for(File file: files) {
                file.delete();
            }
            outputFile.delete();
        }
        wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount");
        System.out.println(wordCountRDD.collect());
    }
}  

  从上面可以看出,lambda的实现更加简洁,也可以看出一个lambda函数表达式就是一个java接口。
  

  我们在http://7639240.blog.运维网.com/7629240/1966958提到的combineByKey,如下的代码:
JavaPairRDD javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),
                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);
//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数
Function createCombiner = new Function() {
    @Override
    public Tuple2 call(Integer value) throws Exception {
        return new Tuple2(value, 1);
    }
};
//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数
Function2 mergeValue =
        new Function2() {
            @Override
            public Tuple2 call(Tuple2 acc, Integer value) throws Exception {
                return new Tuple2(acc._1() + value, acc._2() + 1);
            }
        };
//当需要对不同分区的数据进行聚合的时候应用这个函数
Function2 mergeCombiners =
        new Function2() {
            @Override
            public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception {
                return new Tuple2(acc1._1() + acc2._1(), acc1._2() + acc2._2());
            }
        };
JavaPairRDD combineByKeyRDD =
        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());  可以写成如下的lambda实现的combineByKey:
JavaPairRDD javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),
                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);
//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数
Function createCombiner = value -> new Tuple2(value, 1);
//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数
Function2 mergeValue = (acc, value) ->new Tuple2(acc._1() + value, acc._2() + 1);
//当需要对不同分区的数据进行聚合的时候应用这个函数
Function2 mergeCombiners = (acc1, acc2) -> new Tuple2(acc1._1() + acc2._1(), acc1._2() + acc2._2());
JavaPairRDD combineByKeyRDD =
        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());  

  如果想深入的系统的理解spark RDD api可以参考: spark core RDD api原理详解




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669784-1-1.html 上篇帖子: 第4课:Spark Streaming的Exactly 下篇帖子: (版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表