heshao2005 发表于 2019-1-30 11:36:54

Spark 累加器实验

  以下代码用 Pyspark + IPython 完成
  

  统计日志空行的数量:
  读取日志,创建RDD:
myrdd = sc.textFile("access.log")  


[*]  不使用累加器:
In : s = 0
In : def f(x):
    ...:   global s
    ...:   if len(x) == 0:
    ...:         s += 1
    ...:
In : myrdd.foreach(f)
In : print (s)  得出结果为:
0  原因是python 的变量,即使是全局变量不能应用在各个计算进程(线程)中同步数据,所以需要分布式计算框架的变量来同步数据,Spark 中采用累加器来解决:
  


[*]  使用累加器
In : s = sc.accumulator(0)
In : def f(x):
    ...:   global s
    ...:   if len(x) == 0:
    ...:         s += 1
    ...:
In : myrdd.foreach(f)
In : print (s)  得出正确结果:
14  

  




页: [1]
查看完整版本: Spark 累加器实验