设为首页 收藏本站
查看: 1413|回复: 0

[经验分享] Spark RDD

[复制链接]

尚未签到

发表于 2019-1-30 10:22:52 | 显示全部楼层 |阅读模式
  Spark 的核心是RDD(Resilient Distributed Dataset),即弹性分布式数据集,是由AMPLab实验室提出的概念,属于一种分布式的内存系统数据集应用。Spark的主要优势来自RDD本身的特性,RDD能够与其他系统兼容,可以导入外部存储系统的数据集,例如HDFS、HBase、或者其他hadoop数据源。


1.1、RDD特性


  • RDD 的3种基本运算


RDD运算类型
说明




“转换”运算Transformation
RDD执行“转换”运算的结果,会产生另外一个RDD; RDD具有lazy特性,所有“转换”运算并不会立刻实际执行,等到执行“动作”运算才会实际执行;


“动作”运算Action
RDD执行“动作”运算后不会产生另一个RDD,而是产生数值、数组或写入文件系统; RDD执行“动作”运算会立刻实际执行,而且连同之前的“转换”运算一起执行;


“持久化”Persistence
对于那些会重复使用的RDD,可以将RDD“持久化”在内存中作为后续使用,以提高执行性能;


  • Lineage 机制具备容错的特性
  RDD本身具有Lineage机制,它会记录每个RDD与其父代RDD之间的关联,还会记录通过什么操作由父代RDD而得到该RDD的信息。
RDD本身的immutable(不可变)特性,再加上Lineage机制,使得Spark具备容错的特性。如果某个节点的机器出现了故障,那么存储在这个节点上的RDD损毁后会重新执行一连串的“转换”命令,产生新的输出数据,以避免因为某个节点的故障而造成整个系统无法允许的问题。

1.2、RDD基本操作

1.2.1、基本RDD"转换"运算
  创建RDD使用SparkContext的parallelize方法;


  • 创建intRDD

In [1]: intRDD = sc.parallelize([1,2,3,3,2,5,4,9,0,8,5])
In [2]: intRDD.collect()
Out[2]: [1, 2, 3, 3, 2, 5, 4, 9, 0, 8, 5]
  intRDD执行collect()后会转换为List,这是一个“动作”运算,所以会马上执行;


  • 创建stringRDD

In [3]: stringRDD = sc.parallelize(["Apple", "Orange", "Apple", "Grape", "Banana"])
In [4]: stringRDD.collect()
Out[4]: ['Apple', 'Orange', 'Apple', 'Grape', 'Banana']

  • map 运算

In [5]: intRDD.map(lambda x:x+1).collect()
Out[5]: [2, 3, 4, 4, 3, 6, 5, 10, 1, 9, 6]
In [6]: stringRDD.map(lambda x:"fruit:"+x).collect()
Out[6]: ['fruit:Apple', 'fruit:Orange', 'fruit:Apple', 'fruit:Grape', 'fruit:Banana']

  • filter数字运算

In [7]: intRDD.filter(lambda x:x>5).collect()
Out[7]: [9, 8]
In [8]: intRDD.filter(lambda x:x3).collect()
Out[8]: [5, 4, 5]

  • filter字符串运算

In [9]: stringRDD.filter(lambda x:"ra" in x).collect()
Out[9]: ['Orange', 'Grape']

  • distinct 去重运算

In [11]: intRDD.distinct().collect()
Out[11]: [4, 0, 8, 1, 5, 9, 2, 3]
In [12]: stringRDD.distinct().collect()
Out[12]: ['Orange', 'Apple', 'Grape', 'Banana']

  • randomSplit分割运算

In [13]: splitRDD = intRDD.randomSplit([0.4, 0.6])
In [14]: splitRDD[0].collect()
Out[14]: [2, 0, 5]
In [15]: splitRDD[1].collect()
Out[15]: [1, 2, 3, 3, 5, 4, 9, 8]

  • groupBy 运算

In [17]: groupRDD = intRDD.groupBy(lambda x:"even" if (x%2==0) else "odd").collect()
In [18]: print(groupRDD[0][0],sorted(groupRDD[0][1]))
even [0, 2, 2, 4, 8]
In [19]: print(groupRDD[1][0],sorted(groupRDD[1][1]))
odd [1, 3, 3, 5, 5, 9]
1.2.2、多个RDD"转换"运算


  • 创建3个RDD

In [20]: intRDD1 = sc.parallelize([1,3,5,7,9])
In [21]: intRDD2 = sc.parallelize([2,4,6,8,10])
In [22]: intRDD3 = sc.parallelize([0,1,2,3,4,5,6,7,8,9])

  • union函数进行并集运算

In [23]: intRDD1.union(intRDD2).union(intRDD3).collect()
Out[23]: [1, 3, 5, 7, 9, 2, 4, 6, 8, 10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

  • intersection 交集运算

In [24]: intRDD1.intersection(intRDD2).collect()
Out[24]: []
In [25]: intRDD1.intersection(intRDD3).collect()
Out[25]: [1, 9, 3, 5, 7]
In [26]: intRDD2.intersection(intRDD3).collect()
Out[26]: [8, 2, 4, 6]

  • subtract 差集运算

In [27]: intRDD3.subtract(intRDD1).collect()
Out[27]: [0, 8, 2, 4, 6]
In [28]: intRDD3.subtract(intRDD2).collect()
Out[28]: [0, 1, 9, 3, 5, 7]

  • cartesian笛卡尔乘积运算

In [29]: print(intRDD1.cartesian(intRDD2).collect())
[(1, 2), (1, 4), (1, 6), (1, 8), (1, 10), (3, 2), (3, 4), (3, 6), (3, 8), (3, 10), (5, 2), (5, 4), (5, 6), (5, 8), (5, 10), (7, 2), (9, 2), (7, 4), (9, 4), (7, 6), (9, 6), (7, 8), (7, 10), (9, 8), (9, 10)]
1.2.3、基本"动作"运算
  基本“动作”运算会马上执行,产生结果。

In [30]: intRDD.collect()
Out[30]: [1, 2, 3, 3, 2, 5, 4, 9, 0, 8, 5]
In [31]: intRDD.first()
Out[31]: 1
In [32]: intRDD.take(1)
Out[32]: [1]
In [33]: intRDD.take(2)
Out[33]: [1, 2]
In [34]: intRDD.takeOrdered(3)
Out[34]: [0, 1, 2]
In [35]: intRDD.takeOrdered(3,key=lambda x:-x)
Out[35]: [9, 8, 5]
In [36]: intRDD.stats()
Out[36]: (count: 11, mean: 3.8181818181818183, stdev: 2.6566616720368104, max: 9.0, min: 0.0)
In [37]: intRDD.min()
Out[37]: 0
In [38]: intRDD.max()
Out[38]: 9
In [39]: intRDD.stdev()
Out[39]: 2.6566616720368104
In [40]: intRDD.count()
Out[40]: 11
In [41]: intRDD.sum()
Out[41]: 42
In [42]: intRDD.mean()
Out[42]: 3.8181818181818183
1.2.4、RDD key-value 基本"转换"运算
  Spark RDD 支持键值(key-value)的运算,这也是Map/Reduce的基础;


  • 创建 key-value RDD

In [44]: kvRDD1 = sc.parallelize([(1,2),(3,4),(5,6),(7,8),(9,10)])
In [45]: kvRDD1.collect()
Out[45]: [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

  • 列出全部key值

In [46]: kvRDD1.keys().collect()
Out[46]: [1, 3, 5, 7, 9]

  • 列出全部的value值

In [47]: kvRDD1.values().collect()
Out[47]: [2, 4, 6, 8, 10]

  • 使用filter 筛选key

In [48]: kvRDD1.filter(lambda keyValue:keyValue[0]

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669492-1-1.html 上篇帖子: Spark on Yarn 安装配置 下篇帖子: Spark stdout日志乱码
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表