设为首页 收藏本站
查看: 1355|回复: 0

[经验分享] 9.spark core之共享变量

[复制链接]

尚未签到

发表于 2019-1-30 13:08:55 | 显示全部楼层 |阅读模式
简介
    spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。


  • 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
  • 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。
    spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。


  • 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
  • 累加器用于在驱动器中对数据结果进行聚合。

广播变量

原理
DSC0000.jpg



  • 广播变量只能在Driver端定义,不能在Executor端定义。
  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
  • 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

用法


  • 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
  • 通过value属性访问该对象的值
  • 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)
    实例
        查询每个国家的呼号个数

    python

# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala

// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
val countryContactCounts = contactCounts.map{case (sign, count) => {
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}}.reduceByKey((x, y) => x+y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java

// 将呼号前缀(国家代码)作为广播变量
final Broadcast signPrefixes = sc.broadcast(loadCallSignTable());
JavaPairRDD countryContactCounts = contactCounts.mapToPair(new PairFunction() {
public Tuple2 call(Tuple2 callSignCount) {
String sign = callSignCount._1();
String country = lookupCountry(sign, signPrefixes.value());
return new Tuple2(country, callSignCount._2());
}
}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加器

原理
DSC0001.jpg



  • 累加器在Driver端定义赋初始值。
  • 累加器只能在Driver端读取最后的值,在Excutor端更新。

用法


  • 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
  • Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
  • 驱动器程序可以调用累加器的value属性来访问累加器的值

实例
    累加空行

python

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # 访问全局变量
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
scala

val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 //累加器加1
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)
java

JavaRDD rdd = sc.textFile(args[1]);
final Accumulator blankLines = sc.accumulator(0);
JavaRDD callSigns = rdd.flatMap(new FlatMapFunction() {
public Iterable call(String line) {
if ("".equals(line)) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}
});
callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());
  忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669647-1-1.html 上篇帖子: yarn模式运行spark作业所有属性详解 下篇帖子: 大数据处理为何选择Spark,而不是Hadoop
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表