设为首页 收藏本站
查看: 1928|回复: 0

[经验分享] Apache Spark源码走读之17

[复制链接]

尚未签到

发表于 2015-7-31 12:57:31 | 显示全部楼层 |阅读模式
  欢迎转载,转载请注明出处,徽沪一郎

概要
  今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读。众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢?

new Throwable().printStackTrace
  代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁。但有时苦于对spark系统的了解程度不深,或者对scala认识不够,一时半会之内无法找到答案,那么有没有什么简便的办法呢?
  我的办法就是在日志出现的地方加入下面一句话

new Throwable().printStackTrace()

  现在举一个实际的例子来说明问题。
  比如我们在启动spark-shell之后,输入一句非常简单的sc.textFile("README.md"),会输出下述的log

14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms
14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms
res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13

  那我很想知道是第二句日志所在的tryToPut函数是被谁调用的该怎么办?
  办法就是打开MemoryStore.scala,找到下述语句

logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))

  在这句话之上,添加如下语句

new Throwable().printStackTrace()

  然后,重新进行源码编译

sbt/sbt assembly

  再次打开spark-shell,执行sc.textFile("README.md"),就可以得到如下输出,从中可以清楚知道tryToPut的调用者是谁

14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
14/07/05 19:53:27 WARN MemoryStore: just show the calltrace by entering some modified code
java.lang.Throwable
at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:182)
at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)
at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:699)
at org.apache.spark.storage.BlockManager.put(BlockManager.scala:570)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:821)
at org.apache.spark.broadcast.HttpBroadcast.(HttpBroadcast.scala:52)
at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:787)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:556)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:468)
at $line5.$read$$iwC$$iwC$$iwC$$iwC.(:13)
at $line5.$read$$iwC$$iwC$$iwC.(:18)
at $line5.$read$$iwC$$iwC.(:20)
at $line5.$read$$iwC.(:22)
at $line5.$read.(:24)
at $line5.$read$.(:28)
at $line5.$read$.()
at $line5.$eval$.(:7)
at $line5.$eval$.()
at $line5.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms
14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms
res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13

git同步
  对代码作了修改之后,如果并不想提交代码,那该如何将最新的内容同步到本地呢?

git reset --hard
git pull origin master

Akka消息跟踪
  追踪消息的接收者是谁,相对来说比较容易,只要使用好grep就可以了,当然前提是要对actor model有一点点了解。
  还是举个实例吧,我们知道CoarseGrainedSchedulerBackend会发送LaunchTask消息出来,那么谁是接收方呢?只需要执行以下脚本即可。

grep LaunchTask -r core/src/main

  从如下的输出中,可以清楚看出CoarseGrainedExecutorBackend是LaunchTask的接收方,接收到该函数之后的业务处理,只需要去看看接收方的receive函数即可。

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:    case LaunchTask(data) =>
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:        logError("Received LaunchTask command but executor was null")
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:          executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))

小结
  今天的内容相对简单,没有技术含量,自己做个记述,免得时间久了,不记得。
  
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-92759-1-1.html 上篇帖子: windows平台下快速配置Apache+PHP服务器环境 下篇帖子: Could not initialize class org.apache.log4j.LogManager 报错
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表