设为首页 收藏本站
查看: 2027|回复: 0

[经验分享] Apache Spark源码走读之24

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-31 07:34:05 | 显示全部楼层 |阅读模式
  欢迎转载,转载请注明出处。

概要
  Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。

Sort-based Shuffle之初体验
  通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。
  步骤1: 修改conf/spark-default.conf, 加入如下内容

spark.shuffle.manager SORT

  步骤2: 运行spark-shell

SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell

  步骤3: 执行wordcount

sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect

  步骤4: 查看生成的中间文件

find /tmp/spark-local* -type f

  文件查找结果如下所示

/tmp/spark-local-20140919091822-aa66/0f/shuffle_0_1_0.index
/tmp/spark-local-20140919091822-aa66/30/shuffle_0_0_0.index
/tmp/spark-local-20140919091822-aa66/0c/shuffle_0_0_0.data
/tmp/spark-local-20140919091822-aa66/15/shuffle_0_1_0.data

  可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。
  如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的SORT改为HASH,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。

/tmp/spark-local-20140919092949-14cc/10/shuffle_0_1_3
/tmp/spark-local-20140919092949-14cc/0f/shuffle_0_1_2
/tmp/spark-local-20140919092949-14cc/0f/shuffle_0_0_3
/tmp/spark-local-20140919092949-14cc/0c/shuffle_0_0_0
/tmp/spark-local-20140919092949-14cc/0d/shuffle_0_1_0
/tmp/spark-local-20140919092949-14cc/0d/shuffle_0_0_1
/tmp/spark-local-20140919092949-14cc/0e/shuffle_0_1_1
/tmp/spark-local-20140919092949-14cc/0e/shuffle_0_0_2

  两者生成的文件数量差异非常大,具体数值计算如下


  • 在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
  • 在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件

多次shuffle
  刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下

sc.textFile("README.md").flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey(_ + _).map(p=>(p._2,p._1)).groupByKey.collect

  上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。
  在HASH模式下产生的文件如下所示

/tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
/tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
/tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0

  引入一次新的shuffle,产生了大量的中间文件
  如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。

/tmp/spark-local-20140919094731-034a/29/shuffle_0_3_0.data
/tmp/spark-local-20140919094731-034a/30/shuffle_0_0_0.index
/tmp/spark-local-20140919094731-034a/15/shuffle_0_1_0.data
/tmp/spark-local-20140919094731-034a/36/shuffle_0_2_0.data
/tmp/spark-local-20140919094731-034a/0c/shuffle_0_0_0.data
/tmp/spark-local-20140919094731-034a/32/shuffle_0_2_0.index
/tmp/spark-local-20140919094731-034a/32/shuffle_1_1_0.index
/tmp/spark-local-20140919094731-034a/0f/shuffle_0_1_0.index
/tmp/spark-local-20140919094731-034a/0f/shuffle_1_0_0.index
/tmp/spark-local-20140919094731-034a/0a/shuffle_1_1_0.data
/tmp/spark-local-20140919094731-034a/2b/shuffle_1_0_0.data
/tmp/spark-local-20140919094731-034a/0d/shuffle_0_3_0.index

  值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。

Sort-based Shuffle的设计思想
  sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。
  其具体实现步骤如下:


  • Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
  • ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
  • 如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
  • 当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
  • 最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件
  相应的源文件


  • SortShuffleManager.scala
  • SortShuffleWriter.scala
  • ExternalSorter.scala
  • IndexShuffleBlockManager.scala
  几个重要的函数
  SortShuffleWriter.write

  override def write(records: Iterator[_ >: Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = new MapStatus(blockManager.blockManagerId,
partitionLengths.map(MapOutputTracker.compressSize))
}

  ExternalSorter.insertAll

def insertAll(records: Iterator[_  {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
elementsRead += 1
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpill(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
elementsRead += 1
val kv = records.next()
buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
maybeSpill(usingMap = false)
}
}
}

  writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中

def writePartitionedFile(
blockId: BlockId,
context: TaskContext,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
if (bypassMergeSort && partitionWriters != null) {
// We decided to write separate files for each partition, so just concatenate them. To keep
// this simple we spill out the current in-memory collection so that everything is in files.
spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
partitionWriters.foreach(_.commitAndClose())
var out: FileOutputStream = null
var in: FileInputStream = null
try {
out = new FileOutputStream(outputFile)
for (i

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-92432-1-1.html 上篇帖子: Apache Spark源码走读之6 下篇帖子: Apache 2 移植到Arm开发板
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表