一、概述
上篇blog记录了些在用spark-sql时遇到的一些问题,今天继续记录用Spark提供的RDD转化方法开发公司第一期标签分析系统(一部分scala作业逻辑代码后面blog再给大家分享)遇到的一些SPARK作业错误信息。其中有些问题可能一些数据量或者shuffle量比较小的作业时不会遇到的,我们整套标签系统的初级输入数据大概是8T左右,这里也是个参考。(下面的Spark部署模式为spark on yarn) 二、问题
从这两个错误信息首先可以将错误定位到整个HDFS的读写过程中,其中对于读写超时可以定位到2个参数:dfs.client.socket-timeout(默认60s)、dfs.datanode.socket.write.timeout(默认80s)。在spark的程序中按照自己的实际情况设置这两个值,问题可以解决。给个例子:
val dwd_new_pc_list_patch = "/user/hive/warehouse/pc.db/dwd_new_pc_list/2015-01-*/action=play"
val sparkConf = new SparkConf().setAppName("TagSystem_compositeTag")
.set("spark.kryoserializer.buffer.max.mb", "128").set("spark.rdd.compress","true")
val sc = new SparkContext(sparkConf)
//hdfs客户端的读写超时时间
//默认60000
sc.hadoopConfiguration.set("dfs.client.socket-timeout", "180000")
//默认80000
sc.hadoopConfiguration.set("dfs.datanode.socket.write.timeout", "180000")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val hiveSqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
//(user_id,fo,fo_2,sty,fs)
val source = sc.textFile(dwd_new_pc_list_patch).filter(p => (p.trim != "" && p.split("\\|").length >= 105)).mapPartitions({ it =>
for {
line = targetRequestSize) {
// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
curBlocks = new ArrayBuffer[(BlockId, Long)]
logDebug(s"Creating fetch request of $curRequestSize at $address")
curRequestSize = 0
}
}
// Add in the final request
// 将剩余的请求放到最后一个request中。
if (!curBlocks.isEmpty) {
remoteRequests += new FetchRequest(address, curBlocks)
}
}
}
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
totalBlocks + " blocks")
remoteRequests
} 从代码上看我的个人理解是在shuffle节点每个reduce task会启动5个fetch线程(可以由spark.shuffle.copier.threads配置)去最多spark.reducer.maxMbInFlight个(默认5)其他Excuctor中获取文件位置,然后去fetch它们,并且每次fetch的抓取量不会超过spark.reducer.maxMbInFlight(默认值为48MB)/5。这种机制我个人理解,第一:可以减少单个fetch连接的网络IO、第二:这种将fetch数据并行执行有助于抓取速度提高,减少请求数据的抓取时间总和。
回来结合我现在的问题分析,我将spark.reducer.maxMbInFlight调小,从而减少了每个reduce task中的每个fetch线程的抓取数据量,进而减少了每个fetch连接的持续连接时间,降低了由于reduce task过多导致每个Excutor中存在的fetch线程太多而导致的fetch超时,另外降低内存的占用。
上述分析为个人理解,如有更深入的想法欢迎交流。