设为首页 收藏本站
查看: 1354|回复: 0

[经验分享] Spark的transformation 和 action的操作学习笔记

[复制链接]

尚未签到

发表于 2019-1-30 13:18:50 | 显示全部楼层 |阅读模式
  一、spark的transformation 和 action区别
  Spark有一些基本的transformation 和 action的操作,其中transformation形成各类型的RDD,action不形成RDD,而是对RDD进行累加、合并、保存操作。
  

  
  二、transformation 有哪些
  transformation有map、filter、flatMap(与map不一样)、Sample、groupByKey、ReduceByKey、Union、Join、cogroup、crossProduct、mapValues、sort、partitionBy,共13种。还有sortByKey呢?
  1、map:
  
val rdd = sc.parallelize(List(1,2,3,4,5,6))
val mapRdd = rdd.map(_*2)  //这是典型的函数式编程
mapRdd.collect()  //上面的map是transformation,到了这里的collect才开始执行,是action,返回一个Array    Array(2,4,6,8,10,12)


map(x=>(x,1)),将map(x)这样的,映射成map(x,1)这样的,一般用于对Key进行计数
  2、filter
  过滤,选择函数,
  
val filterRdd = mapRdd.filter(_ > 5)
filterRdd.collect() //返回所有大于5的数据的一个Array, Array(6,8,10,12)


  3、flatmap加上reduceBykey
  

  
val wordcount = rdd.flatMap(_.split(' ')).map((_, 1)).reduceByKey(_+_)  //把每一行进行根据空格分割,然后flatMap会把多个list合并成一个list,最后把每个元素变成一个元组
//然后把具有相同key的元素的value进行相加操作,参考上面图片中的函数定义,针对reduceByKey,传入的函数是对value进行操作的。
wordcount.saveAsTextFile("/xxx/ss/aa")   //把结果存入文件系统
wordcount.collect //可以得到一个数组


  4、groupByKey
  对文件按照空格进行分割后,按照单词进行groupByKey分组
  
  val wordcount=rdd.flatMap(_.split(' ')).map(_.1)).groupByKey
  使用collect查看一下结果
  wordcount.collect
  


  

  5、Union
  2个合并成1个
  
val rdd1 = sc.parallelize(List(('a',1),(‘a’, 2)))
val rdd2 = sc.parallelize(List(('b',1),(‘b’, 2)))

val result_union = rdd1 union rdd2 //结果是把两个list合并成一个,List(('a',1),(‘a’, 2),('b',1),(‘b’, 2))
  


  6、Join
  笛卡尔积的干活,小组循环赛
  
val rdd1 = sc.parallelize(List(('a',1),(‘a’, 2), ('b', 3)))
val rdd2 = sc.parallelize(List(('a',4),(‘b’, 5)))

val result_union = rdd1 join rdd2 //结果是把两个list做笛卡尔积,Array(('a', (1,4), ('a', (2,4), ('b', (3, 5)))
  


  7、sortByKey
  排序,非常好用的哈
  
val wordcount = rdd.flatMap(_split(' ')).map(_,1).reduceByKey(_+_).map(x => (x._2, x._1)).sortByKey(false).map(x => (x._2, x._1))
//其实完成了一个sort by value的过程, sortByKey(false),表示倒序排列
  


  

  三、action有哪些
  action有count、collect、reduce、lookup、save5种。
  
1、count
计算rdd的个数
val rdd = sc.textFile("/xxx/sss/ee")
rdd.count //计算行数
rdd.cache   //可以把rdd保留在内存里面
rdd.count //计算行数,但是因为上面进行了cache,这里速度会很快
  
2、collect
  collect函数可以提取出所有rdd里的数据项
  val rdd1=sc.parallelize(List(('a',1),('b',1)))
  val rdd2=sc.parallelize(List(('c',1),('d',1)))
  val result=rdd1 union rdd2
  使用collect操作查看一下执行结果
  

  3、reduce
  map、reduce是hadoop的2个核心,map是映射,reduce是精简
val rdd = sc.parallelize(List(1,2,3,4))
rdd.reduce(_+_) //reduce是一个action,这里的结果是10


  4、lookup
  查找的干活
val rdd = sc.parallelize(List(('a',1),(‘a’, 2),('b',1),(‘b’, 2))
rdd.lookup("a") //返回一个seq, (1, 2) 是把a对应的所有元素的value提出来组成一个seq


  5、save
  查询搜索结果排名第 1 点击次序排在第 2 的数据
  val rdd1 = sc.textFile("hdfs://192.168.0.10:9000/input/SogouQ2.txt").map(_.split("\t"))   //长度为6错误,好像日志不标准,有的为6,有的不是  .filter(_.length==6)
  rdd1.count()
  val rdd2=rdd1.filter(_(3).toInt==1).filter(_(4).toInt==2).count()
  rdd2.saveAsTextFile("hdfs://192.168.0.10:9000/output/sogou1111/")
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669656-1-1.html 上篇帖子: Spark之RDD持久化、广播、累加器 下篇帖子: Openfire+Spark+Pandion搭建实验(二)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表