|
Spark 的核心是RDD(Resilient Distributed Dataset),即弹性分布式数据集,是由AMPLab实验室提出的概念,属于一种分布式的内存系统数据集应用。Spark的主要优势来自RDD本身的特性,RDD能够与其他系统兼容,可以导入外部存储系统的数据集,例如HDFS、HBase、或者其他hadoop数据源。
1.1、RDD特性
RDD运算类型
说明
“转换”运算Transformation
RDD执行“转换”运算的结果,会产生另外一个RDD; RDD具有lazy特性,所有“转换”运算并不会立刻实际执行,等到执行“动作”运算才会实际执行;
“动作”运算Action
RDD执行“动作”运算后不会产生另一个RDD,而是产生数值、数组或写入文件系统; RDD执行“动作”运算会立刻实际执行,而且连同之前的“转换”运算一起执行;
“持久化”Persistence
对于那些会重复使用的RDD,可以将RDD“持久化”在内存中作为后续使用,以提高执行性能;
RDD本身具有Lineage机制,它会记录每个RDD与其父代RDD之间的关联,还会记录通过什么操作由父代RDD而得到该RDD的信息。
RDD本身的immutable(不可变)特性,再加上Lineage机制,使得Spark具备容错的特性。如果某个节点的机器出现了故障,那么存储在这个节点上的RDD损毁后会重新执行一连串的“转换”命令,产生新的输出数据,以避免因为某个节点的故障而造成整个系统无法允许的问题。
1.2、RDD基本操作
1.2.1、基本RDD"转换"运算
创建RDD使用SparkContext的parallelize方法;
In [1]: intRDD = sc.parallelize([1,2,3,3,2,5,4,9,0,8,5])
In [2]: intRDD.collect()
Out[2]: [1, 2, 3, 3, 2, 5, 4, 9, 0, 8, 5]
intRDD执行collect()后会转换为List,这是一个“动作”运算,所以会马上执行;
In [3]: stringRDD = sc.parallelize(["Apple", "Orange", "Apple", "Grape", "Banana"])
In [4]: stringRDD.collect()
Out[4]: ['Apple', 'Orange', 'Apple', 'Grape', 'Banana']
In [5]: intRDD.map(lambda x:x+1).collect()
Out[5]: [2, 3, 4, 4, 3, 6, 5, 10, 1, 9, 6]
In [6]: stringRDD.map(lambda x:"fruit:"+x).collect()
Out[6]: ['fruit:Apple', 'fruit:Orange', 'fruit:Apple', 'fruit:Grape', 'fruit:Banana']
In [7]: intRDD.filter(lambda x:x>5).collect()
Out[7]: [9, 8]
In [8]: intRDD.filter(lambda x:x3).collect()
Out[8]: [5, 4, 5]
In [9]: stringRDD.filter(lambda x:"ra" in x).collect()
Out[9]: ['Orange', 'Grape']
In [11]: intRDD.distinct().collect()
Out[11]: [4, 0, 8, 1, 5, 9, 2, 3]
In [12]: stringRDD.distinct().collect()
Out[12]: ['Orange', 'Apple', 'Grape', 'Banana']
In [13]: splitRDD = intRDD.randomSplit([0.4, 0.6])
In [14]: splitRDD[0].collect()
Out[14]: [2, 0, 5]
In [15]: splitRDD[1].collect()
Out[15]: [1, 2, 3, 3, 5, 4, 9, 8]
In [17]: groupRDD = intRDD.groupBy(lambda x:"even" if (x%2==0) else "odd").collect()
In [18]: print(groupRDD[0][0],sorted(groupRDD[0][1]))
even [0, 2, 2, 4, 8]
In [19]: print(groupRDD[1][0],sorted(groupRDD[1][1]))
odd [1, 3, 3, 5, 5, 9]
1.2.2、多个RDD"转换"运算
In [20]: intRDD1 = sc.parallelize([1,3,5,7,9])
In [21]: intRDD2 = sc.parallelize([2,4,6,8,10])
In [22]: intRDD3 = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
In [23]: intRDD1.union(intRDD2).union(intRDD3).collect()
Out[23]: [1, 3, 5, 7, 9, 2, 4, 6, 8, 10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [24]: intRDD1.intersection(intRDD2).collect()
Out[24]: []
In [25]: intRDD1.intersection(intRDD3).collect()
Out[25]: [1, 9, 3, 5, 7]
In [26]: intRDD2.intersection(intRDD3).collect()
Out[26]: [8, 2, 4, 6]
In [27]: intRDD3.subtract(intRDD1).collect()
Out[27]: [0, 8, 2, 4, 6]
In [28]: intRDD3.subtract(intRDD2).collect()
Out[28]: [0, 1, 9, 3, 5, 7]
In [29]: print(intRDD1.cartesian(intRDD2).collect())
[(1, 2), (1, 4), (1, 6), (1, 8), (1, 10), (3, 2), (3, 4), (3, 6), (3, 8), (3, 10), (5, 2), (5, 4), (5, 6), (5, 8), (5, 10), (7, 2), (9, 2), (7, 4), (9, 4), (7, 6), (9, 6), (7, 8), (7, 10), (9, 8), (9, 10)]
1.2.3、基本"动作"运算
基本“动作”运算会马上执行,产生结果。
In [30]: intRDD.collect()
Out[30]: [1, 2, 3, 3, 2, 5, 4, 9, 0, 8, 5]
In [31]: intRDD.first()
Out[31]: 1
In [32]: intRDD.take(1)
Out[32]: [1]
In [33]: intRDD.take(2)
Out[33]: [1, 2]
In [34]: intRDD.takeOrdered(3)
Out[34]: [0, 1, 2]
In [35]: intRDD.takeOrdered(3,key=lambda x:-x)
Out[35]: [9, 8, 5]
In [36]: intRDD.stats()
Out[36]: (count: 11, mean: 3.8181818181818183, stdev: 2.6566616720368104, max: 9.0, min: 0.0)
In [37]: intRDD.min()
Out[37]: 0
In [38]: intRDD.max()
Out[38]: 9
In [39]: intRDD.stdev()
Out[39]: 2.6566616720368104
In [40]: intRDD.count()
Out[40]: 11
In [41]: intRDD.sum()
Out[41]: 42
In [42]: intRDD.mean()
Out[42]: 3.8181818181818183
1.2.4、RDD key-value 基本"转换"运算
Spark RDD 支持键值(key-value)的运算,这也是Map/Reduce的基础;
In [44]: kvRDD1 = sc.parallelize([(1,2),(3,4),(5,6),(7,8),(9,10)])
In [45]: kvRDD1.collect()
Out[45]: [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
In [46]: kvRDD1.keys().collect()
Out[46]: [1, 3, 5, 7, 9]
In [47]: kvRDD1.values().collect()
Out[47]: [2, 4, 6, 8, 10]
In [48]: kvRDD1.filter(lambda keyValue:keyValue[0] |
|