设为首页 收藏本站
查看: 939|回复: 0

[经验分享] Storm-源码分析- Storm中Zookeeper的使用

[复制链接]

尚未签到

发表于 2015-9-6 08:10:57 | 显示全部楼层 |阅读模式
  在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用
  
  ClusterState
  首先定义操作Zookeeper集群的interface
  
(defprotocol ClusterState
(set-ephemeral-node [this path data])
(delete-node [this path])
(create-sequential [this path data])
(set-data [this path data])  ;; if node does not exist, create persistent with this data
(get-data [this path watch?])
(get-children [this path watch?])
(mkdirs [this path])
(close [this])
(register [this callback])
(unregister [this id])
)
  实现和生成用于操作Zookeeper集群的record
首先创建zk-client, 并在zk上创建STORM-ZOOKEEPER-ROOT目录
接着定义,
    callbacks, callback集合
    active, 标志zk集群状态
    zk, zk client
  创建zk client的时候, 设置了watcher, 即zk server当状态发生变化时会给client发送event, 此处client设置的watcher会调用callbacks来处理server发送的event

  Storm在操作Zookeeper时, 使用CuratorFramework(http://curator.incubator.apache.org/curator-framework/index.html)

  最后实现ClusterState protocol, 其中register和unregister是用来添加/删除callbacks的, 其他都是些zk的常规操作



(defn mk-distributed-cluster-state [conf]
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
(.close zk))
(let [callbacks (atom {})
active (atom true)
zk (zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
:auth-conf conf
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
(when @active
(when-not (= :connected state)
(log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
(when-not (= :none type)
(doseq [callback (vals @callbacks)]
(callback type path))))
))]
(reify
ClusterState
(register [this callback]
(let [id (uuid)]
(swap! callbacks assoc id callback)
id
))
(unregister [this id]
(swap! callbacks dissoc id))
(set-ephemeral-node [this path data]
(zk/mkdirs zk (parent-path path))
(if (zk/exists zk path false)
(try-cause
(zk/set-data zk path data) ; should verify that it's ephemeral
(catch KeeperException$NoNodeException e
(log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
(zk/create-node zk path data :ephemeral)
))
(zk/create-node zk path data :ephemeral)
))
(create-sequential [this path data]
(zk/create-node zk path data :sequential))
(set-data [this path data]
;; note: this does not turn off any existing watches
(if (zk/exists zk path false)
(zk/set-data zk path data)
(do
(zk/mkdirs zk (parent-path path))
(zk/create-node zk path data :persistent)
)))
(delete-node [this path]
(zk/delete-recursive zk path)
)
(get-data [this path watch?]
(zk/get-data zk path watch?)
)
(get-children [this path watch?]
(zk/get-children zk path watch?))
(mkdirs [this path]
(zk/mkdirs zk path))
(close [this]
(reset! active false)
(.close zk))
)))
  

StormClusterState
  定义针对Storm定制的zk操作协议, 包含各种storm里面的信息在zk上的读写



(defprotocol StormClusterState
(assignments [this callback])
(assignment-info [this storm-id callback])
(active-storms [this])
(storm-base [this storm-id callback])
(get-worker-heartbeat [this storm-id node port])
(executor-beats [this storm-id executor->node+port])
(supervisors [this callback])
(supervisor-info [this supervisor-id])  ;; returns nil if doesn't exist
(setup-heartbeats! [this storm-id])
(teardown-heartbeats! [this storm-id])
(teardown-topology-errors! [this storm-id])
(heartbeat-storms [this])
(error-topologies [this])
(worker-heartbeat! [this storm-id node port info])
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
(activate-storm! [this storm-id storm-base])
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
(remove-storm! [this storm-id])
(report-error [this storm-id task-id error])
(errors [this storm-id task-id])
(disconnect [this])
)
  首先判断是否第一次mk-storm-cluster-state, 既是否进行过zk cluster state的创建, 如果没有调用mk-distributed-cluster-state
接着, 定义一系列的callbacks, 并调用cluster-state的register, 注册到callbacks列表中
       state-id 就是register返回的callback的uuid
再者, 在zk上创建storm的子目录
最后, 实现StormClusterState协议, 实现各种zk数据的读写



(defn mk-storm-cluster-state [cluster-state-spec]
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
[true (mk-distributed-cluster-state cluster-state-spec)])
assignment-info-callback (atom {})
supervisors-callback (atom nil)
assignments-callback (atom nil) ;在StormClusterState.assignments中被set
storm-base-callback (atom {})
state-id (register
cluster-state
(fn [type path]
(let [[subtree & args] (tokenize-path path)] ;将path按'/'分割
(condp = subtree ;对path的subtree部分进行swith…case
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback) ;issue-callback!, 执行并删除该callback, 保证callback只被执行一次
(issue-map-callback! assignment-info-callback (first args)))
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
;; this should never happen
(halt-process! 30 "Unknown callback for subtree " subtree args)
)
)))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
(mkdirs cluster-state p))
(reify
StormClusterState
)
  

例子
  通过一个场景来说明storm怎样使用zookeeper
  supervisor中的mk-synchronize-supervisor, 主要用于下载新的, 并删除不使用的topology代码
所以这个逻辑光执行一次是不够的, 需要当每次assignment发生变化的时候就执行一次
  storm是利用zookeeper的watcher来解决这个问题
1. 在mk-distributed-cluster-state中创建zk client的时候配置watcher, 当收到zk server的event的时候, 调用callbacks列表里面的callback进行处理

2. 在mk-storm-cluster-state 中将callback加入cluster-state的callback列表
    而这个callback本身, 就是根据event中的path(代表哪部分数据发生change)来issue在storm-cluster-state中维护的一系列callback
    比如, 当ASSIGNMENTS-ROOT发生变化时, 会调用assignments-callback  
3. 那么也就是说只需要将mk-synchronize-supervisor, set到assignments-callback, 就可以保证当ASSIGNMENTS-ROOT发生变化时, 调用mk-synchronize-supervisor去同步topology代码
   什么时候set? 在第一次调用mk-synchronize-supervisor的时候


sync-callback (fn [& ignored] (.add event-manager this))
assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
  同步topology代码是消耗时间的事情, 所以实现的时候放在后台执行, 只是将this(function) add到event-manager的queue里面, 后台线程会执行这个函数
   并且在调用assignment获取assignments-snapshot的时候, 将sync-callback set到assignments-callback中去



     (assignments [this callback]
(when callback
(reset! assignments-callback callback))
(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
  By the way, 对于get-children, 是否有callback, 即是否被watch, 读的数据是不一样的, 具体原因不是很清楚, 需要后面看看zk的具体使用
  
  
  
  
  



(defn get-children [^CuratorFramework zk ^String path watch?]
(if watch?
(.. zk (getChildren) (watched) (forPath (normalize-path path)))
(.. zk (getChildren) (forPath (normalize-path path)))))
  4. 前面说了issue-callback!在执行assignments-callback之前, 会将其清空, 所以如果需要不断的触发, 那么就要不断的设置assignments-callback
    所以作为callback, mk-synchronize-supervisor会先通过assignments-snapshot去重设assignments-callback
    至于为什么要采用这样的机制? 现在还看不清楚

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-109867-1-1.html 上篇帖子: 初识zookeeper(一)之zookeeper的安装及配置 下篇帖子: ZooKeeper学习第四期---构建ZooKeeper应用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表