设为首页 收藏本站
查看: 1236|回复: 0

[经验分享] sparkRDD 算子的创建和使用

[复制链接]

尚未签到

发表于 2019-1-31 08:11:12 | 显示全部楼层 |阅读模式
  spark是大数据领域近几年比较火的编程开发语言。有众多的好处,比如速度快,基于内存式计算框架。
  不多说直接讲 spark的RDD 算子的使用。
  如果有spark环境搭建等问题,请自行查找资料。本文不做讲述。
  spark rdd的创建有两种方式:
  1>从集合创建。也就是从父rdd继承过来
  2>从外部创建。
  

  

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import com.google.common.base.Optional;
import scala.Tuple2;
public class Demo01 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
//map(jsc);
//filter(jsc);
    // flatMap(jsc);
//groupByKey(jsc);
//reduceByKey(jsc);
//sortByKey(jsc);
//join(jsc);
leftOutJoin(jsc);
jsc.stop();
}
//每一条元素 都乘以2,并且打印
private static void map(JavaSparkContext jsc) {
//数据源
List lst = Arrays.asList(1,2,3,4,5,6,7,8);
JavaRDD numRDD = jsc.parallelize(lst);
JavaRDD resultRDD = numRDD.map(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer num) throws Exception {
return num * 2;
}
});
resultRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer num) throws Exception {
System.out.println(num);
}
});
}
// 把集合中的偶数过滤出来
private static void filter(JavaSparkContext jsc) {
//数据源
List lst = Arrays.asList(1,2,3,4,5,6,7,8);
JavaRDD numRDD = jsc.parallelize(lst);
System.out.println(numRDD.filter(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer num) throws Exception {
return num % 2 ==0;
}
}).collect());
}
//将一行行数据的单词拆分为一个个单词
private static void flatMap(JavaSparkContext jsc) {
List lst = Arrays.asList("hi tim ","hello girl","hello spark");
JavaRDD lines = jsc.parallelize(lst);
JavaRDD resultRDD = lines.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = 1L;
@Override
public Iterable call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
System.out.println(resultRDD.collect());
}
// 根据班级进行分组
private static void groupByKey(JavaSparkContext jsc) {
// int ,Integer
// scala 里面的类型,没有像Java这样分为基本类型和包装类,因为scala是一种更加强的面向对象语言,
//一切皆对象,里面的类型,也有对应的方法可以调用,隐式转换
// 模拟数据
@SuppressWarnings("unchecked")
List lst = Arrays.asList(
new Tuple2("class01", 100),
new Tuple2("class02",101),
new Tuple2("class01",199),
new Tuple2("class02",121),
new Tuple2("class02",120));
JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
JavaPairRDD groupedRDD = cla***DD.groupByKey();
groupedRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2 tuple)
throws Exception {
String classKey = tuple._1;
Iterator values = tuple._2.iterator();
while (values.hasNext()) {
Integer value = values.next();
System.out.println("key:" + classKey + "\t" + "value:" + value);
}
}
});
}
private static void reduceByKey(JavaSparkContext jsc) {
@SuppressWarnings("unchecked")
List lst = Arrays.asList(
new Tuple2("class01", 100),
new Tuple2("class02",101),
new Tuple2("class01",199),
new Tuple2("class02",121),
new Tuple2("class02",120));
JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
JavaPairRDD resultRDD = cla***DD.reduceByKey(new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
resultRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2 tuple) throws Exception {
System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2);
}
});
}
// 把学生的成绩前3名取出来,并打印
// 1.先排序sortByKey,然后take(3),再foreach
private static void sortByKey(JavaSparkContext jsc) {
@SuppressWarnings("unchecked")
List lst = Arrays.asList(
new Tuple2("tom", 60),
new Tuple2("kate",80),
new Tuple2("kobe",100),
new Tuple2("马蓉",4),
new Tuple2("宋哲",2),
new Tuple2("白百合",3),
new Tuple2("隔壁老王",1));
JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
JavaPairRDD pairRDD = cla***DD.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._2, tuple._1);
}
});
//do no
JavaPairRDD sortedRDD = pairRDD.sortByKey();
JavaPairRDD sortedRDD01 = sortedRDD.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._2, tuple._1);
}
} );
// take 也是一个action操作
List result = sortedRDD01.take(3);
System.out.println(result);
}
private static void join(JavaSparkContext jsc) {
// 模拟数据
@SuppressWarnings("unchecked")
List names =Arrays.asList(
new Tuple2(1,"jack"),
new Tuple2(2,"rose"),
new Tuple2(3,"tom"),
new Tuple2(4,"赵丽颖"));
JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);
List scores = Arrays.asList(
new Tuple2(1,60),
new Tuple2(4,100),
new Tuple2(2,30));
JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);
JavaPairRDD joinedRDD = num2scoresRDD.join(num2NamesRDD);
//姓名成绩排序,取前2名
JavaPairRDD score2NameRDD = joinedRDD.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(
Tuple2 tuple)
throws Exception {
Integer score = tuple._2._1;
String name = tuple._2._2;
return new Tuple2(score,name);
}
});
// sortByKey之后,你可以执行一个maptoPair的操作,转换为
System.out.println(score2NameRDD.sortByKey(false).take(2));
}
// 学生成绩改良版
private static void leftOutJoin(JavaSparkContext jsc) {
// 模拟数据
@SuppressWarnings("unchecked")
List names =Arrays.asList(
new Tuple2(1,"jack"),
new Tuple2(2,"rose"),
new Tuple2(3,"tom"),
new Tuple2(4,"赵丽颖"));
JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);
List scores = Arrays.asList(
new Tuple2(1,60),
new Tuple2(4,100),
new Tuple2(2,30));
JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);
// num2scoresRDD num2NamesRDD
//JavaPairRDD joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
// 注意join,谁join谁,没区别,但是leftoutjoin 是有顺序的
JavaPairRDD joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
JavaPairRDD pairRDD = joinedRDD.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(
Tuple2 tuple)
throws Exception {
String name = tuple._2._1;
Optional scoreOptional = tuple._2._2;
Integer score = null;
         if(scoreOptional.isPresent()){
        score= scoreOptional.get();
         }else {
         score = 0;
         }
return new Tuple2(score, name);
}
});
JavaPairRDD sortedRDD = pairRDD.sortByKey(false);
sortedRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2 tuple)
throws Exception {
if(tuple._1 == 0){
System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成绩0分" );
}else{
System.out.println("姓名:" + tuple._2 + "\t" + "分数:" + tuple._1);
}
}
});
}
}  如有疑问可跟帖讨论。欢迎拍砖




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669823-1-1.html 上篇帖子: 【互动问答分享】第2期决胜云计算大数据时代Spark亚太研究院公益大讲堂 下篇帖子: NetWordCound
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表