设为首页 收藏本站
查看: 812|回复: 0

[经验分享] Spark源码研读

[复制链接]

尚未签到

发表于 2019-1-30 10:31:00 | 显示全部楼层 |阅读模式
0 关于散篇记录
  散篇记录就是,我自己觉得有需要记录一下以方便后来查找的内容,就记录下来。

1 Spark版本
  Spark 2.1.0。

2 说明
  源码过程中所涉及的许多Scala的知识,完全可以参考之前Scala的笔记文章,应该来说确实很多知识内容都涉及到了。

3 SparkConf源码

  SparkConf的源码相对不难,主要是对Spark本身要有所理解,同时Scala也应该要有所掌握,那么看起来就不太复杂,只看了比较核心的方法,整体有个思路,做了一些个人的备注,有些目前还没有涉及到的用法自然就不会先去看,这里作为记录。


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
*
* Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
* values from any `spark.*` Java system properties set in your application as well. In this case,
* parameters you set directly on the `SparkConf` object take priority over system properties.
*
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
* get the same configuration no matter what the system properties are.
*
* All setter methods in this class support chaining. For example, you can write
* `new SparkConf().setMaster("local").setAppName("My app")`.
* Leaf Note: 之所以可以这么优雅地设置这些属性,观察这些方法,最后调用set,并且返回了代表自身对象的this
*
* @param loadDefaults whether to also load values from Java system properties
*
* @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
* by the user. Spark does not support modifying the configuration at runtime.
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
/** Leaf Note:
* extends ... with ...,这里继承了多个特质,物质就类似于Java中的interface
* Logging用来打日志,Serializable,序列化,分布式环境,SparkConf对象传来传去的,当然需要序列化
* 说一下Conleable,看到下面其重载了clone()方法,其实就是生成了一个配置一样的SparkConf对象
* 目的是避免多个组件共用同一个SparkConf对象时出现的并发问题,不同组件都使用,clone一个给你
* 任何地方要使用SparkConf对象,调用clone方法复制一个,十分优雅
* */
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
/** Leaf Note: 这是一个辅助构造器,其默认也是调用主构造器,也就是类定义中需要传入参数的,它就是主构造器,这是scala的定义*/
def this() = this(true)
// Leaf Note: 线程安全的map,就是真正用来保存Spark的配置属性的
private val settings = new ConcurrentHashMap[String, String]()
@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader
}
if (loadDefaults) {
loadFromSystemProperties(false)
}
/* Leaf Note: private[spark]中的spark,实际为org.apache.spark包,表示该方法只能在该包下使用 */
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value)  (x.getKey, x.getValue)).toArray
}
/**
* Get all parameters that start with `prefix`
*/
def getAllWithPrefix(prefix: String): Array[(String, String)] = {
getAll.filter { case (k, v) => k.startsWith(prefix) }
.map { case (k, v) => (k.substring(prefix.length), v) }
}
/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int = {
getOption(key).map(_.toInt).getOrElse(defaultValue)
}
/** Get a parameter as a long, falling back to a default if not set */
def getLong(key: String, defaultValue: Long): Long = {
getOption(key).map(_.toLong).getOrElse(defaultValue)
}
/** Get a parameter as a double, falling back to a default if not set */
def getDouble(key: String, defaultValue: Double): Double = {
getOption(key).map(_.toDouble).getOrElse(defaultValue)
}
/** Get a parameter as a boolean, falling back to a default if not set */
def getBoolean(key: String, defaultValue: Boolean): Boolean = {
getOption(key).map(_.toBoolean).getOrElse(defaultValue)
}
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
getAllWithPrefix("spark.executorEnv.")
}
/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
*/
def getAppId: String = get("spark.app.id")
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = {
settings.containsKey(key) ||
configsWithAlternatives.get(key).toSeq.flatten.exists { alt => contains(alt.key) }
}
private[spark] def contains(entry: ConfigEntry[_]): Boolean = contains(entry.key)
/** Copy this object */
/** 克隆本SparkConf对象中的配置到一个新的SparkConf对象中 */
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
/**
* By using this instead of System.getenv(), environment variables can be mocked
* in unit tests.
*/
private[spark] def getenv(name: String): String = System.getenv(name)
/**
* Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones.
*/
private[spark] def validateSettings() {
if (contains("spark.local.dir")) {
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
logWarning(msg)
}
val executorOptsKey = "spark.executor.extraJavaOptions"
val executorClasspathKey = "spark.executor.extraClassPath"
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
val driverLibraryPathKey = "spark.driver.extraLibraryPath"
val sparkExecutorInstances = "spark.executor.instances"
// Used by Yarn in 1.1 and before
sys.props.get("spark.driver.libraryPath").foreach { value =>
val warning =
s"""
|spark.driver.libraryPath was detected (set to '$value').
|This is deprecated in Spark 1.2+.
|
|Please instead use: $driverLibraryPathKey
""".stripMargin
logWarning(warning)
}
// Validate spark.executor.extraJavaOptions
getOption(executorOptsKey).foreach { javaOpts =>
if (javaOpts.contains("-Dspark")) {
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
"Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
throw new Exception(msg)
}
if (javaOpts.contains("-Xmx")) {
val msg = s"$executorOptsKey is not allowed to specify max heap memory settings " +
s"(was '$javaOpts'). Use spark.executor.memory instead."
throw new Exception(msg)
}
}
// Validate memory fractions
val deprecatedMemoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
val memoryKeys = Seq(
"spark.memory.fraction",
"spark.memory.storageFraction") ++
deprecatedMemoryKeys
for (key  1 || value < 0) {
throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
}
}
// Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
if (!legacyMemoryManagement) {
val keyset = deprecatedMemoryKeys.toSet
val detected = settings.keys().asScala.filter(keyset.contains)
if (detected.nonEmpty) {
logWarning("Detected deprecated memory fraction settings: " +
detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +
"memory management are unified. All memory fractions used in the old model are " +
"now deprecated and no longer read. If you wish to use the old memory management, " +
s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")
}
}
// Check for legacy configs
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
s"""
|SPARK_JAVA_OPTS was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
| - ./spark-submit with --driver-java-options to set -X options for a driver
| - spark.executor.extraJavaOptions to set -X options for executors
| - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)
""".stripMargin
logWarning(warning)
for (key
val warning =
s"""
|SPARK_CLASSPATH was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with --driver-class-path to augment the driver classpath
| - spark.executor.extraClassPath to augment the executor classpath
""".stripMargin
logWarning(warning)
for (key
val warning =
s"""
|SPARK_WORKER_INSTANCES was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with --num-executors to specify the number of executors
| - Or set SPARK_EXECUTOR_INSTANCES
| - spark.executor.instances to configure the number of instances in the spark config.
""".stripMargin
logWarning(warning)
set("spark.executor.instances", value)
}
}
if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
"instead use \"yarn\" with specified deploy mode."
get("spark.master") match {
case "yarn-cluster" =>
logWarning(warning)
set("spark.master", "yarn")
set("spark.submit.deployMode", "cluster")
case "yarn-client" =>
logWarning(warning)
set("spark.master", "yarn")
set("spark.submit.deployMode", "client")
case _ => // Any other unexpected master will be checked when creating scheduler backend.
}
}
if (contains("spark.submit.deployMode")) {
get("spark.submit.deployMode") match {
case "cluster" | "client" =>
case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
"\"client\".")
}
}
}
/**
* Return a string listing all keys and values, one per line. This is useful to print the
* configuration out for debugging.
*/
def toDebugString: String = {
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
}
private[spark] object SparkConf extends Logging {
/**
* Maps deprecated config keys to information about the deprecation.
*
* The extra information is logged as a warning when the config is present in the user's
* configuration.
*/
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig("spark.cache.class", "0.8",
"The spark.cache.class property is no longer being used! Specify storage levels using " +
"the RDD.persist() method instead."),
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
"Please use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*")
)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
}
/**
* Maps a current config key to alternate keys that were used in previous version of Spark.
*
* The alternates are used in the order defined in this map. If deprecated configs are
* present in the user's configuration, a warning is logged.
*/
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
"spark.history.fs.update.interval" -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
AlternateConfig("spark.history.updateInterval", "1.3")),
"spark.history.fs.cleaner.interval" -> Seq(
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
"spark.history.fs.cleaner.maxAge" -> Seq(
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
"spark.yarn.am.waitTime" -> Seq(
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s")),
"spark.reducer.maxSizeInFlight" -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
"spark.executor.logs.rolling.maxSize" -> Seq(
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
"spark.io.compression.snappy.blockSize" -> Seq(
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
"spark.io.compression.lz4.blockSize" -> Seq(
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
AlternateConfig("spark.akka.retry.wait", "1.4")),
"spark.rpc.askTimeout" -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
"spark.rpc.lookupTimeout" -> Seq(
AlternateConfig("spark.akka.lookupTimeout", "1.4")),
"spark.streaming.fileStream.minRememberDuration" -> Seq(
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0"))
)
/**
* A view of `configsWithAlternatives` that makes it more efficient to look up deprecated
* config keys.
*
* Maps the deprecated config name to a 2-tuple (new config name, alternate config info).
*/
private val allAlternatives: Map[String, (String, AlternateConfig)] = {
configsWithAlternatives.keys.flatMap { key =>
configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }
}.toMap
}
/**
* Return whether the given config should be passed to an executor on start-up.
*
* Certain authentication configs are required from the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
isSparkPortConf(name)
}
/**
* Return true if the given config matches either `spark.*.port` or `spark.port.*`.
*/
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
}
/**
* Looks for available deprecated keys for the given config option, and return the first
* value available.
*/
def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
configsWithAlternatives.get(key).flatMap { alts =>
alts.collectFirst { case alt if conf.contains(alt.key) =>
val value = conf.get(alt.key)
if (alt.translation != null) alt.translation(value) else value
}
}
}
/**
* Logs a warning message if the given config key is deprecated.
*/
def logDeprecationWarning(key: String): Unit = {
deprecatedConfigs.get(key).foreach { cfg =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. ${cfg.deprecationMessage}")
return
}
allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. Please use the new key '$newKey' instead.")
return
}
if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
logWarning(
s"The configuration key $key is not supported any more " +
s"because Spark doesn't use Akka since 2.0")
}
}
/**
* Holds information about keys that have been deprecated and do not have a replacement.
*
* @param key The deprecated key.
* @param version Version of Spark where key was deprecated.
* @param deprecationMessage Message to include in the deprecation warning.
*/
private case class DeprecatedConfig(
key: String,
version: String,
deprecationMessage: String)
/**
* Information about an alternate configuration key that has been deprecated.
*
* @param key The deprecated config key.
* @param version The Spark version in which the key was deprecated.
* @param translation A translation function for converting old config values into new ones.
*/
private case class AlternateConfig(
key: String,
version: String,
translation: String => String = null)
}




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669503-1-1.html 上篇帖子: centOS7下Spark安装配置 下篇帖子: spark-7326056
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表