1 RDD的依赖关系及容错
1.1 RDD的依赖关系
RDD的 依赖关系分为两种:窄依赖 (Narrow Dependencies)与宽依赖 (Wide Dependencies , 源码中称为 Shuffle Dependencies)
依赖有2个作用,其一 用来解决数据容错的高效性 ; 其二用来划分 stage。
窄依赖 :每个父 RDD的一个 Partition 最多被子 RDD 的一个 Partition 所使用 ( 1:1 或 n:1 )。例如 map、 filter 、 union 等操作都会产生窄依赖;
子 RDD分区通常对应常数个父 RDD 分区 (O(1) ,与数据规模无关 。
宽依赖 :一个父 RDD的 Partition 会被多个子 RDD 的 Partition 所使用,例如 groupByKey 、 reduceByKey 、 sortByKey 等操作都会产生宽依赖; ( 1:m 或 n:m )
(子 RDD分区通常对应所有的父 RDD 分区 (O(n) ,与数据规模有关 )
相比于宽依赖,窄依赖对优化很有利 ,主要基于以下两点:
1、宽依赖往往对应着 shuffle操作,需要在运行过程中将同一个父 RDD 的分区传入到不同的子 RDD 分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父 RDD 的分区只会传入到一个子 RDD 分区中,通常可以在一个节点内完成转换。
2、当 RDD分区丢失时(某个节点故障), spark 会对数据进行重算。
Ø 对于窄依赖,由于父 RDD的一个分区只对应一个子 RDD 分区,这样只需要重算和子 RDD 分区对应的父 RDD 分区即可,所以这个重算对数据的利用率是 100% 的;
Ø 对于宽依赖,重算的父 RDD分区对应多个子 RDD 分区,这样实际上父 RDD 中只有一部分的数据是被用于恢复这个丢失的子 RDD 分区的,另一部分对应子 RDD 的其它未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子 RDD 分区通常来自多个父 RDD 分区,极端情况下,所有的父 RDD 分区都要进行重新计算。
Ø 如下图所示, b1分区丢失,则需要重新计算 a1,a2 和 a3 ,这就产生了冗余计算 (a1,a2,a3 中对应 b2 的数据 ) 。
区分这两种依赖很有用。首先,窄依赖允许在一个集群节点上以流水线的方式( pipeline)计算所有父分区。例如,逐个元素地执行 map 、然后 filter 操作;而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行 Shuffle ,这与 MapReduce 类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失 RDD 分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的 Lineage 图,单个节点失效可能导致这个 RDD 的所有祖先丢失部分分区,因而需要整体重新计算。
【误解】之前一直理解错了,以为窄依赖中每个子 RDD可能对应多个父 RDD, 当子 RDD 丢失时会导致多个父 RDD 进行重新计算,所以窄依赖不如宽依赖有优势。 而实际上应该深入到分区级别去看待这个问题,而且 重算的效用也不在于算的多少,而在于有多少是冗余的计算 。窄依赖中需要重算的都是必须的,所以重算不冗余。
窄依赖的函数有: map、 filter、 union、 join(父 RDD 是 hash-partitioned ) 、 mapPartitions、 mapValues
宽依赖的函数有: groupByKey、 join(父 RDD 不是 hash-partitioned ) 、 partitionBy
1.2 依赖样例
依赖的继承关系:
val rdd1 = sc.parallelize(1 to 10, 1)
val rdd2 = sc.parallelize(11 to 20, 1)
val rdd3 = rdd1.union(rdd2)
rdd3.dependencies.size
// 长度为 2 ,值为 rdd1 、 rdd2 ,意为 rdd3 依赖 rdd1 、 rdd2
rdd3.dependencies
// 结果:
rdd3.dependencies(0).rdd.collect
// 打印 rdd1 的数据
rdd3.dependencies(1).rdd.collect
// 打印 rdd2 的数据
rdd3.dependencies(3).rdd.collect
// 数组越界,报错
哪些RDD Actions对应shuffleDependency ?下面的join (r5 )好像就没有shuffleDependency
val r1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
val r2 = r1.keyBy(_.length)
val r3 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"))
val r4 = r3.keyBy(_.length)
val r5 = r2.join(r4)
回答:join不一定会有shuffleDependency ,上面的操作中就没有。
redueceByKey会产生shuffleDependency 。
注意上面操作中的keyBy,和我的想象不太一样。要注意一下。
keyBy:与map 操作较为类似,给每个元素增加了一个key
以下这个例子有点意思:
val r1 = sc.textFile("hdfs:///user/hadoop/data/block_test1.csv")
r1
val r2 = r1.dependencies(0).rdd
r2.partitions.size
r2.preferredLocations(r2.partitions(0))
r2.preferredLocations(r2.partitions(3))
有意思的地方在于(查找依赖、优先位置):
1、r1 的类型为 MapPartitionsRDD
2、r1 依赖于r2 ,如果没有这个赋值语句是看不出来的。r2 的类型为: HadoopRDD
3、可以检索r2 各个分区的位置,该hdfs 文件系统的副本数设置为2
1.3 RDD的容错(lineage、checkpoint)
一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新( CheckPoint Data,和 Logging The Updates )。
面向大规模数据分析,数据检查点操作成本很高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源。
因此, Spark选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此, RDD只支持粗粒度转换,即只记录单个块上执行的单个操作 ( 记录如何从其他 RDD转换而来 , 即 lineage) ,然后将创建 RDD的一系列变换序列(每个 RDD 都包含了他是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称 “ 血统 (Lineage)” 容错)记录下来,以便恢复丢失的分区。
Lineage本质上很类似于数据库中的重做日志( Redo Log ),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。
Lineage容错原理: 在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父 RDD分区重算即可,不依赖于其他节点。而宽依赖需要父 RDD 的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子 RDD 的分区丢失、重算父 RDD 分区时,父 RDD 相应分区的所有数据都是子 RDD 分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子 RDD 分区重算的每个父 RDD 的每个分区的所有数据并不是都给丢失的子 RDD 分区用的,会有一部分数据相当于对应的是未丢失的子 RDD 分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用 Checkpoint 算子来做检查点,不仅要考虑 Lineage 是否足够长,也要考虑是否有宽依赖,对宽依赖加 Checkpoint 是最物有所值的。
Checkpoint机制。在以下2 种情况下,RDD 需要加检查点:
Ø DAG中的 Lineage 过长,如果重算,则开销太大(如在 多次 迭代中)
Ø 在宽依赖上做 Checkpoint获得的收益更大
由于 RDD是只读的,所以 Spark 的 RDD 计算中一致性不是主要关心的内容,内存相对容易管理,这也是设计者很有远见的地方,这样减少了框架的复杂性,提升了性能和可扩展性,为以后上层框架的丰富奠定了强有力的基础。
在 RDD计算中,通过检查点机制进行容错,传统做检查点有两种方式:通过冗余数据和日志记录更新操作。在 RDD 中的 doCheckPoint 方法相当于通过冗余数据来缓存数据,而之前介绍的血统就是通过相当粗粒度的记录更新操作来实现容错的。
检查点(本质是通过将 RDD写入 Disk 做检查点)是为了通过 lineage 做容错的辅助 , Lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的 RDD 开始重做 Lineage ,就会减少开销 。
1.4 checkpoint与cache的关系
1、从本质上说: checkpoint是容错机制;cache 是优化机制
2、 checkpoint将数据写到共享存储中(hdfs ) ;cache通常是内存中
3、运算时间很长或运算量太大才能得到的 RDD, computing chain 过长或依赖其他 RDD 很多的 RDD , 需要做 checkpoint。会被重复使用的(但不能太大)RDD ,做cache 。
实际上,将 ShuffleMapTask 的输出结果存放到本地磁盘也算是 checkpoint ,只不过这个 checkpoint 的主要目的是去 partition 输出数据。
4、RDD 的checkpoint 操作完成后会斩断lineage ,cache 操作对lineage 没有影响。
checkpoint 在 Spark Streaming 中特别重要, spark streaming 中对于一些有状态的操作,这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches ,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链,必须隔一段时间进行一次 checkpoint 。
cache 和 checkpoint 是有显著区别的 , 缓存把 RDD 计算出来然后放在内存中, 但是 RDD 的依赖链(相当于数据库中的 redo 日志),也不能丢掉,当某个点某个 executor 宕了,上面 cache 的 RDD 就会丢掉,需要通过依赖链重放计算出来,不同的是, checkpoint 是把 RDD 保存在 HDFS 中,是多副本可靠存储,所以依赖链就可以丢掉了,即斩断了依赖链,是通过复制实现的高容错。
注意: checkpoint需要把 job 重新从头算一遍,最好先 cache 一下, checkpoint 就可以直接保存缓存中的 RDD 了,就不需要重头计算一遍了,对性能有极大的提升。
1.5 checkpoint的使用与流程
checkpoint 的正确使用姿势
val data = sc.textFile("/tmp/spark/1.data").cache() // 注意要 cache
sc.setCheckpointDir("/tmp/spark/checkpoint")
data.checkpoint
data.count
//问题 : cache 和checkpoint有没有先后的问题;有了cache 可以避免第二次计算,我在代码中可以看见相关的说明!!!
使用很简单, 就是设置一下 checkpoint 目录,然后再 rdd 上调用 checkpoint 方法, action 的时候就对数据进行了 checkpoint
checkpoint 写流程
RDD checkpoint 过程中会经过以下几个状态,
[ Initialized –> marked for checkpointing –> checkpointing in progress –> checkpointed ]
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com