Spark RDD
Spark 的核心是RDD(Resilient Distributed Dataset),即弹性分布式数据集,是由AMPLab实验室提出的概念,属于一种分布式的内存系统数据集应用。Spark的主要优势来自RDD本身的特性,RDD能够与其他系统兼容,可以导入外部存储系统的数据集,例如HDFS、HBase、或者其他hadoop数据源。1.1、RDD特性
[*]RDD 的3种基本运算
RDD运算类型
说明
“转换”运算Transformation
RDD执行“转换”运算的结果,会产生另外一个RDD; RDD具有lazy特性,所有“转换”运算并不会立刻实际执行,等到执行“动作”运算才会实际执行;
“动作”运算Action
RDD执行“动作”运算后不会产生另一个RDD,而是产生数值、数组或写入文件系统; RDD执行“动作”运算会立刻实际执行,而且连同之前的“转换”运算一起执行;
“持久化”Persistence
对于那些会重复使用的RDD,可以将RDD“持久化”在内存中作为后续使用,以提高执行性能;
[*]Lineage 机制具备容错的特性
RDD本身具有Lineage机制,它会记录每个RDD与其父代RDD之间的关联,还会记录通过什么操作由父代RDD而得到该RDD的信息。
RDD本身的immutable(不可变)特性,再加上Lineage机制,使得Spark具备容错的特性。如果某个节点的机器出现了故障,那么存储在这个节点上的RDD损毁后会重新执行一连串的“转换”命令,产生新的输出数据,以避免因为某个节点的故障而造成整个系统无法允许的问题。
1.2、RDD基本操作
1.2.1、基本RDD"转换"运算
创建RDD使用SparkContext的parallelize方法;
[*]创建intRDD
In : intRDD = sc.parallelize()
In : intRDD.collect()
Out:
intRDD执行collect()后会转换为List,这是一个“动作”运算,所以会马上执行;
[*]创建stringRDD
In : stringRDD = sc.parallelize(["Apple", "Orange", "Apple", "Grape", "Banana"])
In : stringRDD.collect()
Out: ['Apple', 'Orange', 'Apple', 'Grape', 'Banana']
[*]map 运算
In : intRDD.map(lambda x:x+1).collect()
Out:
In : stringRDD.map(lambda x:"fruit:"+x).collect()
Out: ['fruit:Apple', 'fruit:Orange', 'fruit:Apple', 'fruit:Grape', 'fruit:Banana']
[*]filter数字运算
In : intRDD.filter(lambda x:x>5).collect()
Out:
In : intRDD.filter(lambda x:x3).collect()
Out:
[*]filter字符串运算
In : stringRDD.filter(lambda x:"ra" in x).collect()
Out: ['Orange', 'Grape']
[*]distinct 去重运算
In : intRDD.distinct().collect()
Out:
In : stringRDD.distinct().collect()
Out: ['Orange', 'Apple', 'Grape', 'Banana']
[*]randomSplit分割运算
In : splitRDD = intRDD.randomSplit()
In : splitRDD.collect()
Out:
In : splitRDD.collect()
Out:
[*]groupBy 运算
In : groupRDD = intRDD.groupBy(lambda x:"even" if (x%2==0) else "odd").collect()
In : print(groupRDD,sorted(groupRDD))
even
In : print(groupRDD,sorted(groupRDD))
odd
1.2.2、多个RDD"转换"运算
[*]创建3个RDD
In : intRDD1 = sc.parallelize()
In : intRDD2 = sc.parallelize()
In : intRDD3 = sc.parallelize()
[*]union函数进行并集运算
In : intRDD1.union(intRDD2).union(intRDD3).collect()
Out:
[*]intersection 交集运算
In : intRDD1.intersection(intRDD2).collect()
Out: []
In : intRDD1.intersection(intRDD3).collect()
Out:
In : intRDD2.intersection(intRDD3).collect()
Out:
[*]subtract 差集运算
In : intRDD3.subtract(intRDD1).collect()
Out:
In : intRDD3.subtract(intRDD2).collect()
Out:
[*]cartesian笛卡尔乘积运算
In : print(intRDD1.cartesian(intRDD2).collect())
[(1, 2), (1, 4), (1, 6), (1, 8), (1, 10), (3, 2), (3, 4), (3, 6), (3, 8), (3, 10), (5, 2), (5, 4), (5, 6), (5, 8), (5, 10), (7, 2), (9, 2), (7, 4), (9, 4), (7, 6), (9, 6), (7, 8), (7, 10), (9, 8), (9, 10)]
1.2.3、基本"动作"运算
基本“动作”运算会马上执行,产生结果。
In : intRDD.collect()
Out:
In : intRDD.first()
Out: 1
In : intRDD.take(1)
Out:
In : intRDD.take(2)
Out:
In : intRDD.takeOrdered(3)
Out:
In : intRDD.takeOrdered(3,key=lambda x:-x)
Out:
In : intRDD.stats()
Out: (count: 11, mean: 3.8181818181818183, stdev: 2.6566616720368104, max: 9.0, min: 0.0)
In : intRDD.min()
Out: 0
In : intRDD.max()
Out: 9
In : intRDD.stdev()
Out: 2.6566616720368104
In : intRDD.count()
Out: 11
In : intRDD.sum()
Out: 42
In : intRDD.mean()
Out: 3.8181818181818183
1.2.4、RDD key-value 基本"转换"运算
Spark RDD 支持键值(key-value)的运算,这也是Map/Reduce的基础;
[*]创建 key-value RDD
In : kvRDD1 = sc.parallelize([(1,2),(3,4),(5,6),(7,8),(9,10)])
In : kvRDD1.collect()
Out: [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
[*]列出全部key值
In : kvRDD1.keys().collect()
Out:
[*]列出全部的value值
In : kvRDD1.values().collect()
Out:
[*]使用filter 筛选key
In : kvRDD1.filter(lambda keyValue:keyValue
页:
[1]