设为首页 收藏本站
查看: 1159|回复: 0

[经验分享] (版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

[复制链接]

尚未签到

发表于 2019-1-31 07:21:14 | 显示全部楼层 |阅读模式
  本期内容:
      1、Spark Streaming资源动态分配
      2、Spark Streaming动态控制消费速率
  
为什么需要动态?

a)
Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。
  
b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。

Spark Streaming资源动态调整的时候会面临挑战:

Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。


  


  
Spark Streaming资源动态申请

1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。

  

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
  logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
  } else {
    None
  }
_executorAllocationManager.foreach(_.start())  设置spark.dynamicAllocation.enabled参数为true
  
这里会通过实例化ExecutorAllocationManager对
象来动态分配资源,其内部是有定时器会不断的去扫描Executor的情况,通过线程池的方式调用schedule()来完成资源动态分配。
  
/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
  listenerBus.addListener(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
        schedule()
//动态调整Executor分配数量
      }
catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
          logWarning(
s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
      }
    }
  }
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
  
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

  updateAndSyncNumExecutorsTarget(now)
//更新Executor数量

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
    }
    !expired
  }
}
/**
* Updates our target number of executors and syncs the result with the cluster manager.
*
* Check to see whether our existing allocation and the requests we've made previously exceed our
* current needs. If so, truncate our target and let the cluster manager know so that it can
* cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
*
@return the delta in the target number of executors.
*/
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded

if (initializing) {
// Do not change our target while we are still initializing,
    // Otherwise the first job may have to ramp up unnecessarily
0
} else if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
    // executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
    numExecutorsTarget
= math.max(maxNeeded, minNumExecutors)
numExecutorsToAdd = 1

// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
      client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
      logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
    }
numExecutorsTarget - oldNumExecutorsTarget
  } else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
    logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
  } else {
0
}
}  
动态控制消费速率:

Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,
spark.streaming.backpressure.enabled
参数设置。


  


  
备注:
资料来源于:DT_大数据梦工厂(Spark发行版本定制)
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669785-1-1.html 上篇帖子: spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式 下篇帖子: 第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表